aboutsummaryrefslogtreecommitdiff
path: root/src/table/merkle.rs
diff options
context:
space:
mode:
authorMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
committerMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
commit829f815a897b04986559910bbcbf53625adcdf20 (patch)
tree6db3c27cff2aded754a641d1f2b05c83be701267 /src/table/merkle.rs
parent99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff)
parenta096ced35562bd0a8877a1ee2f755be1edafe343 (diff)
downloadgarage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz
garage-829f815a897b04986559910bbcbf53625adcdf20.zip
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r--src/table/merkle.rs150
1 files changed, 86 insertions, 64 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 93bf7e47..a5c29723 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -1,15 +1,13 @@
use std::sync::Arc;
use std::time::Duration;
-use futures::select;
-use futures_util::future::*;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use sled::transaction::{
- ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
-};
use tokio::sync::watch;
-use garage_util::background::BackgroundRunner;
+use garage_db as db;
+
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -79,43 +77,17 @@ where
empty_node_hash,
});
- let ret2 = ret.clone();
- background.spawn_worker(
- format!("Merkle tree updater for {}", F::TABLE_NAME),
- |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
- );
+ background.spawn_worker(MerkleWorker(ret.clone()));
ret
}
- async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- if let Some(x) = self.data.merkle_todo.iter().next() {
- match x {
- Ok((key, valhash)) => {
- if let Err(e) = self.update_item(&key[..], &valhash[..]) {
- warn!(
- "({}) Error while updating Merkle tree item: {}",
- F::TABLE_NAME,
- e
- );
- }
- }
- Err(e) => {
- warn!(
- "({}) Error while iterating on Merkle todo tree: {}",
- F::TABLE_NAME,
- e
- );
- tokio::time::sleep(Duration::from_secs(10)).await;
- }
- }
- } else {
- select! {
- _ = self.data.merkle_todo_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
+ fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
+ if let Some((key, valhash)) = self.data.merkle_todo.first()? {
+ self.update_item(&key, &valhash)?;
+ Ok(WorkerState::Busy)
+ } else {
+ Ok(WorkerState::Idle)
}
}
@@ -137,13 +109,16 @@ where
};
self.data
.merkle_tree
- .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
+ .db()
+ .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
- let deleted = self
- .data
- .merkle_todo
- .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
- .is_ok();
+ let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
+ let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
+ if remove {
+ tx.remove(&self.data.merkle_todo, k)?;
+ }
+ Ok(remove)
+ })?;
if !deleted {
debug!(
@@ -157,12 +132,12 @@ where
fn update_item_rec(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &[u8],
khash: &Hash,
key: &MerkleNodeKey,
new_vhash: Option<Hash>,
- ) -> ConflictableTransactionResult<Option<Hash>, Error> {
+ ) -> db::TxResult<Option<Hash>, Error> {
let i = key.prefix.len();
// Read node at current position (defined by the prefix stored in key)
@@ -203,7 +178,7 @@ where
}
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
x @ MerkleNode::Leaf(_, _) => {
- tx.remove(key_sub.encode())?;
+ tx.remove(&self.data.merkle_tree, key_sub.encode())?;
Some(x)
}
}
@@ -283,28 +258,27 @@ where
fn read_node_txn(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey,
- ) -> ConflictableTransactionResult<MerkleNode, Error> {
- let ent = tx.get(k.encode())?;
- MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort)
+ ) -> db::TxResult<MerkleNode, Error> {
+ let ent = tx.get(&self.data.merkle_tree, k.encode())?;
+ MerkleNode::decode_opt(&ent).map_err(db::TxError::Abort)
}
fn put_node_txn(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey,
v: &MerkleNode,
- ) -> ConflictableTransactionResult<Hash, Error> {
+ ) -> db::TxResult<Hash, Error> {
trace!("Put Merkle node: {:?} => {:?}", k, v);
if *v == MerkleNode::Empty {
- tx.remove(k.encode())?;
+ tx.remove(&self.data.merkle_tree, k.encode())?;
Ok(self.empty_node_hash)
} else {
- let vby = rmp_to_vec_all_named(v)
- .map_err(|e| ConflictableTransactionError::Abort(e.into()))?;
+ let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]);
- tx.insert(k.encode(), vby)?;
+ tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
Ok(rethash)
}
}
@@ -312,15 +286,63 @@ where
// Access a node in the Merkle tree, used by the sync protocol
pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
let ent = self.data.merkle_tree.get(k.encode())?;
- MerkleNode::decode_opt(ent)
+ MerkleNode::decode_opt(&ent)
+ }
+
+ pub fn merkle_tree_len(&self) -> Result<usize, Error> {
+ Ok(self.data.merkle_tree.len()?)
}
- pub fn merkle_tree_len(&self) -> usize {
- self.data.merkle_tree.len()
+ pub fn todo_len(&self) -> Result<usize, Error> {
+ Ok(self.data.merkle_todo.len()?)
}
+}
+
+struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static;
- pub fn todo_len(&self) -> usize {
- self.data.merkle_todo.len()
+#[async_trait]
+impl<F, R> Worker for MerkleWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("{} Merkle tree updater", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.0.todo_len().unwrap_or(0);
+ if l > 0 {
+ Some(format!("{} items in queue", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let updater = self.0.clone();
+ tokio::task::spawn_blocking(move || {
+ for _i in 0..100 {
+ let s = updater.updater_loop_iter();
+ if !matches!(s, Ok(WorkerState::Busy)) {
+ return s;
+ }
+ }
+ Ok(WorkerState::Busy)
+ })
+ .await
+ .unwrap()
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ WorkerState::Busy
}
}
@@ -347,7 +369,7 @@ impl MerkleNodeKey {
}
impl MerkleNode {
- fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> {
+ fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
match ent {
None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),