aboutsummaryrefslogtreecommitdiff
path: root/src/table/merkle.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-16 11:43:58 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-16 11:43:58 +0100
commit515029d026937d29395379c76188f509984b8ace (patch)
tree0a89cd87079f330d1021a1954a1328654b236e65 /src/table/merkle.rs
parent1d9961e4118af0e26068e1d6c5c6c009a1292a88 (diff)
downloadgarage-515029d026937d29395379c76188f509984b8ace.tar.gz
garage-515029d026937d29395379c76188f509984b8ace.zip
Refactor code
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r--src/table/merkle.rs61
1 files changed, 34 insertions, 27 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 60b7833f..8c3dcad9 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -9,12 +9,16 @@ use serde::{Deserialize, Serialize};
use sled::transaction::{
ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
};
-use tokio::sync::{watch, Notify};
+use tokio::sync::watch;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use crate::data::*;
+use crate::replication::*;
+use crate::schema::*;
+
pub type MerklePartition = [u8; 2];
pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash {
@@ -32,28 +36,30 @@ pub fn hash_of_merkle_partition_opt(p: Option<MerklePartition>) -> Hash {
// 16 bits (two bytes) of item's partition keys' hashes.
// It builds one Merkle tree for each of these 2**16 partitions.
-pub struct MerkleUpdater {
- table_name: String,
+pub struct MerkleUpdater<F: TableSchema, R: TableReplication> {
+ data: Arc<TableData<F, R>>,
background: Arc<BackgroundRunner>,
// Content of the todo tree: items where
// - key = the key of an item in the main table, ie hash(partition_key)+sort_key
// - value = the hash of the full serialized item, if present,
// or an empty vec if item is absent (deleted)
- pub(crate) todo: sled::Tree,
- pub(crate) todo_notify: Notify,
+ // Fields in data:
+ // pub(crate) merkle_todo: sled::Tree,
+ // pub(crate) merkle_todo_notify: Notify,
// Content of the merkle tree: items where
// - key = .bytes() for MerkleNodeKey
// - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
- pub(crate) merkle_tree: sled::Tree,
+ // Field in data:
+ // pub(crate) merkle_tree: sled::Tree,
empty_node_hash: Hash,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MerkleNodeKey {
// partition: first 16 bits (two bytes) of the partition_key's hash
- pub partition: MerklePartition,
+ pub partition: [u8; 2],
// prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
#[serde(with = "serde_bytes")]
@@ -74,27 +80,26 @@ pub enum MerkleNode {
Leaf(Vec<u8>, Hash),
}
-impl MerkleUpdater {
+impl<F, R> MerkleUpdater<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
pub(crate) fn launch(
- table_name: String,
+ data: Arc<TableData<F, R>>,
background: Arc<BackgroundRunner>,
- todo: sled::Tree,
- merkle_tree: sled::Tree,
) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
let ret = Arc::new(Self {
- table_name,
+ data,
background,
- todo,
- todo_notify: Notify::new(),
- merkle_tree,
empty_node_hash,
});
let ret2 = ret.clone();
ret.background.spawn_worker(
- format!("Merkle tree updater for {}", ret.table_name),
+ format!("Merkle tree updater for {}", ret.data.name),
|must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
);
@@ -103,27 +108,27 @@ impl MerkleUpdater {
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
- if let Some(x) = self.todo.iter().next() {
+ if let Some(x) = self.data.merkle_todo.iter().next() {
match x {
Ok((key, valhash)) => {
if let Err(e) = self.update_item(&key[..], &valhash[..]) {
warn!(
"({}) Error while updating Merkle tree item: {}",
- self.table_name, e
+ self.data.name, e
);
}
}
Err(e) => {
warn!(
"({}) Error while iterating on Merkle todo tree: {}",
- self.table_name, e
+ self.data.name, e
);
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
} else {
select! {
- _ = self.todo_notify.notified().fuse() => (),
+ _ = self.data.merkle_todo_notify.notified().fuse() => (),
_ = must_exit.changed().fuse() => (),
}
}
@@ -143,18 +148,20 @@ impl MerkleUpdater {
partition: k[0..2].try_into().unwrap(),
prefix: vec![],
};
- self.merkle_tree
+ self.data
+ .merkle_tree
.transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?;
let deleted = self
- .todo
+ .data
+ .merkle_todo
.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
.is_ok();
if !deleted {
debug!(
"({}) Item not deleted from Merkle todo because it changed: {:?}",
- self.table_name, k
+ self.data.name, k
);
}
Ok(())
@@ -197,7 +204,7 @@ impl MerkleUpdater {
// should not happen
warn!(
"({}) Replacing intermediate node with empty node, should not happen.",
- self.table_name
+ self.data.name
);
Some(MerkleNode::Empty)
} else if children.len() == 1 {
@@ -301,7 +308,7 @@ impl MerkleUpdater {
// Access a node in the Merkle tree, used by the sync protocol
pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
- let ent = self.merkle_tree.get(k.encode())?;
+ let ent = self.data.merkle_tree.get(k.encode())?;
match ent {
None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
@@ -309,11 +316,11 @@ impl MerkleUpdater {
}
pub fn merkle_tree_len(&self) -> usize {
- self.merkle_tree.len()
+ self.data.merkle_tree.len()
}
pub fn todo_len(&self) -> usize {
- self.todo.len()
+ self.data.merkle_todo.len()
}
}