diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-04 11:34:43 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-04 11:34:43 +0100 |
commit | 570e5e5bbb7a3eac41350db9433e28ed289b97f4 (patch) | |
tree | a7fc299ba180098be5a3bef28a39256870ce697b /src/table/gc.rs | |
parent | 6e44369cbc810b8912ca0f7f5fd293e87f10c851 (diff) | |
parent | 4eb8ca3a528dae2848141f5cc3eb607eb7d40114 (diff) | |
download | garage-570e5e5bbb7a3eac41350db9433e28ed289b97f4.tar.gz garage-570e5e5bbb7a3eac41350db9433e28ed289b97f4.zip |
Merge branch 'main' into next
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r-- | src/table/gc.rs | 58 |
1 files changed, 17 insertions, 41 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index 83e7eeff..5b9124a7 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024; // and the moment the garbage collection actually happens) const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600); -pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { +pub(crate) struct TableGc<F: TableSchema, R: TableReplication> { system: Arc<System>, data: Arc<TableData<F, R>>, @@ -49,29 +49,26 @@ impl Rpc for GcRpc { type Response = Result<GcRpc, Error>; } -impl<F, R> TableGc<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { +impl<F: TableSchema, R: TableReplication> TableGc<F, R> { + pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { let endpoint = system .netapp .endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME)); let gc = Arc::new(Self { - system: system.clone(), + system, data, endpoint, }); - gc.endpoint.set_handler(gc.clone()); - system.background.spawn_worker(GcWorker::new(gc.clone())); - gc } + pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { + bg.spawn_worker(GcWorker::new(self.clone())); + } + async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { let now = now_msec(); @@ -276,11 +273,7 @@ where } #[async_trait] -impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> { async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> { match message { GcRpc::Update(items) => { @@ -298,20 +291,12 @@ where } } -struct GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +struct GcWorker<F: TableSchema, R: TableReplication> { gc: Arc<TableGc<F, R>>, wait_delay: Duration, } -impl<F, R> GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> GcWorker<F, R> { fn new(gc: Arc<TableGc<F, R>>) -> Self { Self { gc, @@ -321,21 +306,15 @@ where } #[async_trait] -impl<F, R> Worker for GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> { fn name(&self) -> String { format!("{} GC", F::TABLE_NAME) } - fn info(&self) -> Option<String> { - let l = self.gc.data.gc_todo_len().unwrap_or(0); - if l > 0 { - Some(format!("{} items in queue", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64), + ..Default::default() } } @@ -349,10 +328,7 @@ where } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { tokio::time::sleep(self.wait_delay).await; WorkerState::Busy } |