diff options
Diffstat (limited to 'src/api/admin')
-rw-r--r-- | src/api/admin/Cargo.toml | 1 | ||||
-rw-r--r-- | src/api/admin/api.rs | 34 | ||||
-rw-r--r-- | src/api/admin/lib.rs | 1 | ||||
-rw-r--r-- | src/api/admin/repair.rs | 407 | ||||
-rw-r--r-- | src/api/admin/router_v2.rs | 1 |
5 files changed, 444 insertions, 0 deletions
diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml index 7b1ad2f0..9ac099e8 100644 --- a/src/api/admin/Cargo.toml +++ b/src/api/admin/Cargo.toml @@ -16,6 +16,7 @@ path = "lib.rs" [dependencies] format_table.workspace = true garage_model.workspace = true +garage_block.workspace = true garage_table.workspace = true garage_util.workspace = true garage_rpc.workspace = true diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs index 4caae02c..48c9ee0b 100644 --- a/src/api/admin/api.rs +++ b/src/api/admin/api.rs @@ -81,6 +81,7 @@ admin_endpoints![ CreateMetadataSnapshot, GetNodeStatistics, GetClusterStatistics, + LaunchRepairOperation, // Worker operations ListWorkers, @@ -99,6 +100,7 @@ local_admin_endpoints![ // Node operations CreateMetadataSnapshot, GetNodeStatistics, + LaunchRepairOperation, // Background workers ListWorkers, GetWorkerInfo, @@ -663,6 +665,38 @@ pub struct GetClusterStatisticsResponse { pub freeform: String, } +// ---- LaunchRepairOperation ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalLaunchRepairOperationRequest { + pub repair_type: RepairType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum RepairType { + Tables, + Blocks, + Versions, + MultipartUploads, + BlockRefs, + BlockRc, + Rebalance, + Scrub(ScrubCommand), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ScrubCommand { + Start, + Pause, + Resume, + Cancel, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalLaunchRepairOperationResponse; + // ********************************************** // Worker operations // ********************************************** diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs index cc673eef..fe4b0598 100644 --- a/src/api/admin/lib.rs +++ b/src/api/admin/lib.rs @@ -17,6 +17,7 @@ mod special; mod block; mod node; +mod repair; mod worker; use std::sync::Arc; diff --git a/src/api/admin/repair.rs b/src/api/admin/repair.rs new file mode 100644 index 00000000..19bb4d51 --- /dev/null +++ b/src/api/admin/repair.rs @@ -0,0 +1,407 @@ +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::{Error as GarageError, OkOrMessage}; +use garage_util::migrate::Migrate; + +use garage_table::replication::*; +use garage_table::*; + +use garage_block::manager::BlockManager; +use garage_block::repair::ScrubWorkerCommand; + +use garage_model::garage::Garage; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; + +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; + +const RC_REPAIR_ITER_COUNT: usize = 64; + +#[async_trait] +impl RequestHandler for LocalLaunchRepairOperationRequest { + type Response = LocalLaunchRepairOperationResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<LocalLaunchRepairOperationResponse, Error> { + let bg = &admin.background; + match self.repair_type { + RepairType::Tables => { + info!("Launching a full sync of tables"); + garage.bucket_table.syncer.add_full_sync()?; + garage.object_table.syncer.add_full_sync()?; + garage.version_table.syncer.add_full_sync()?; + garage.block_ref_table.syncer.add_full_sync()?; + garage.key_table.syncer.add_full_sync()?; + } + RepairType::Versions => { + info!("Repairing the versions table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); + } + RepairType::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); + } + RepairType::BlockRefs => { + info!("Repairing the block refs table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); + } + RepairType::BlockRc => { + info!("Repairing the block reference counters"); + bg.spawn_worker(BlockRcRepair::new( + garage.block_manager.clone(), + garage.block_ref_table.clone(), + )); + } + RepairType::Blocks => { + info!("Repairing the stored blocks"); + bg.spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); + } + RepairType::Scrub(cmd) => { + let cmd = match cmd { + ScrubCommand::Start => ScrubWorkerCommand::Start, + ScrubCommand::Pause => { + ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)) + } + ScrubCommand::Resume => ScrubWorkerCommand::Resume, + ScrubCommand::Cancel => ScrubWorkerCommand::Cancel, + }; + info!("Sending command to scrub worker: {:?}", cmd); + garage.block_manager.send_scrub_command(cmd).await?; + } + RepairType::Rebalance => { + info!("Rebalancing the stored blocks among storage locations"); + bg.spawn_worker(garage_block::repair::RebalanceWorker::new( + garage.block_manager.clone(), + )); + } + } + Ok(LocalLaunchRepairOperationResponse) + } +} + +// ---- + +#[async_trait] +trait TableRepair: Send + Sync + 'static { + type T: TableSchema; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>; + + async fn process( + &mut self, + garage: &Garage, + entry: <<Self as TableRepair>::T as TableSchema>::E, + ) -> Result<bool, GarageError>; +} + +struct TableRepairWorker<T: TableRepair> { + garage: Arc<Garage>, + pos: Vec<u8>, + counter: usize, + repairs: usize, + inner: T, +} + +impl<R: TableRepair> TableRepairWorker<R> { + fn new(garage: Arc<Garage>, inner: R) -> Self { + Self { + garage, + inner, + pos: vec![], + counter: 0, + repairs: 0, + } + } +} + +#[async_trait] +impl<R: TableRepair> Worker for TableRepairWorker<R> { + fn name(&self) -> String { + format!("{} repair worker", R::T::TABLE_NAME) + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{} ({})", self.counter, self.repairs)), + ..Default::default() + } + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerState, GarageError> { + let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), + None => { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + }; + + let entry = <R::T as TableSchema>::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; + } + + self.counter += 1; + self.pos = next_pos; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + +// ---- + +struct RepairVersions; + +#[async_trait] +impl TableRepair for RepairVersions { + type T = VersionTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.version_table + } + + async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, GarageError> { + if !version.deleted.get() { + let ref_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => garage + .object_table + .get(bucket_id, key) + .await? + .map(|o| { + o.versions().iter().any(|x| { + x.uuid == version.uuid && x.state != ObjectVersionState::Aborted + }) + }) + .unwrap_or(false), + VersionBacklink::MultipartUpload { upload_id } => garage + .mpu_table + .get(upload_id, &EmptyKey) + .await? + .map(|u| !u.deleted.get()) + .unwrap_or(false), + }; + + if !ref_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + garage + .version_table + .insert(&Version::new(version.uuid, version.backlink, true)) + .await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairBlockRefs; + +#[async_trait] +impl TableRepair for RepairBlockRefs { + type T = BlockRefTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.block_ref_table + } + + async fn process( + &mut self, + garage: &Garage, + mut block_ref: BlockRef, + ) -> Result<bool, GarageError> { + if !block_ref.deleted.get() { + let ref_exists = garage + .version_table + .get(&block_ref.version, &EmptyKey) + .await? + .map(|v| !v.deleted.get()) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair block ref: marking block_ref as deleted: {:?}", + block_ref + ); + block_ref.deleted.set(); + garage.block_ref_table.insert(&block_ref).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairMpu; + +#[async_trait] +impl TableRepair for RepairMpu { + type T = MultipartUploadTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.mpu_table + } + + async fn process( + &mut self, + garage: &Garage, + mut mpu: MultipartUpload, + ) -> Result<bool, GarageError> { + if !mpu.deleted.get() { + let ref_exists = garage + .object_table + .get(&mpu.bucket_id, &mpu.key) + .await? + .map(|o| { + o.versions() + .iter() + .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) + }) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair multipart uploads: marking mpu as deleted: {:?}", + mpu + ); + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ===== block reference counter repair ===== + +pub struct BlockRcRepair { + block_manager: Arc<BlockManager>, + block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + cursor: Hash, + counter: u64, + repairs: u64, +} + +impl BlockRcRepair { + fn new( + block_manager: Arc<BlockManager>, + block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + ) -> Self { + Self { + block_manager, + block_ref_table, + cursor: [0u8; 32].into(), + counter: 0, + repairs: 0, + } + } +} + +#[async_trait] +impl Worker for BlockRcRepair { + fn name(&self) -> String { + format!("Block refcount repair worker") + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{} ({})", self.counter, self.repairs)), + ..Default::default() + } + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerState, GarageError> { + for _i in 0..RC_REPAIR_ITER_COUNT { + let next1 = self + .block_manager + .rc + .rc_table + .range(self.cursor.as_slice()..)? + .next() + .transpose()? + .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap()); + let next2 = self + .block_ref_table + .data + .store + .range(self.cursor.as_slice()..)? + .next() + .transpose()? + .map(|(k, _)| Hash::try_from(&k[..32]).unwrap()); + let next = match (next1, next2) { + (Some(k1), Some(k2)) => std::cmp::min(k1, k2), + (Some(k), None) | (None, Some(k)) => k, + (None, None) => { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + }; + + if self.block_manager.rc.recalculate_rc(&next)?.1 { + self.repairs += 1; + } + self.counter += 1; + if let Some(next_incr) = next.increment() { + self.cursor = next_incr; + } else { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + } + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs index a0f415c2..4d5c015e 100644 --- a/src/api/admin/router_v2.rs +++ b/src/api/admin/router_v2.rs @@ -63,6 +63,7 @@ impl AdminApiRequest { POST CreateMetadataSnapshot (default::body, query::node), GET GetNodeStatistics (default::body, query::node), GET GetClusterStatistics (), + POST LaunchRepairOperation (body_field, query::node), // Worker APIs POST ListWorkers (body_field, query::node), POST GetWorkerInfo (body_field, query::node), |