aboutsummaryrefslogtreecommitdiff
path: root/src/table/merkle.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-21 13:50:55 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-21 13:50:55 +0200
commit3119ea59b08e62ce14cddeb4809a397785b662bb (patch)
tree08e54b210ba73988ed1ac56db7045f39f3791bdb /src/table/merkle.rs
parente12bc3b5959c0aa5ae3c8a746c62bab2e7343a62 (diff)
downloadgarage-3119ea59b08e62ce14cddeb4809a397785b662bb.tar.gz
garage-3119ea59b08e62ce14cddeb4809a397785b662bb.zip
New worker semantics applied to garage_table
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r--src/table/merkle.rs67
1 files changed, 34 insertions, 33 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 7685b193..d4d2717f 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -1,14 +1,13 @@
use std::sync::Arc;
use std::time::Duration;
-use futures::select;
-use futures_util::future::*;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use garage_db as db;
-use garage_util::background::BackgroundRunner;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -78,43 +77,17 @@ where
empty_node_hash,
});
- let ret2 = ret.clone();
- background.spawn_worker(
- format!("Merkle tree updater for {}", F::TABLE_NAME),
- |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
- );
+ background.spawn_worker(MerkleWorker(ret.clone()));
ret
}
- async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- match self.updater_loop_iter() {
- Ok(true) => (),
- Ok(false) => {
- select! {
- _ = self.data.merkle_todo_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
- Err(e) => {
- warn!(
- "({}) Error while updating Merkle tree item: {}",
- F::TABLE_NAME,
- e
- );
- tokio::time::sleep(Duration::from_secs(10)).await;
- }
- }
- }
- }
-
- fn updater_loop_iter(&self) -> Result<bool, Error> {
+ fn updater_loop_iter(&self) -> Result<WorkerStatus, Error> {
if let Some((key, valhash)) = self.data.merkle_todo.first()? {
self.update_item(&key, &valhash)?;
- Ok(true)
+ Ok(WorkerStatus::Busy)
} else {
- Ok(false)
+ Ok(WorkerStatus::Idle)
}
}
@@ -325,6 +298,34 @@ where
}
}
+struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static;
+
+#[async_trait]
+impl<F, R> Worker for MerkleWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("Merkle tree updater: {}", F::TABLE_NAME)
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ self.0.updater_loop_iter()
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ WorkerStatus::Busy
+ }
+}
+
impl MerkleNodeKey {
fn encode(&self) -> Vec<u8> {
let mut ret = Vec::with_capacity(2 + self.prefix.len());