diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-16 11:43:58 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-16 11:43:58 +0100 |
commit | 515029d026937d29395379c76188f509984b8ace (patch) | |
tree | 0a89cd87079f330d1021a1954a1328654b236e65 /src/table/merkle.rs | |
parent | 1d9961e4118af0e26068e1d6c5c6c009a1292a88 (diff) | |
download | garage-515029d026937d29395379c76188f509984b8ace.tar.gz garage-515029d026937d29395379c76188f509984b8ace.zip |
Refactor code
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r-- | src/table/merkle.rs | 61 |
1 files changed, 34 insertions, 27 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 60b7833f..8c3dcad9 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -9,12 +9,16 @@ use serde::{Deserialize, Serialize}; use sled::transaction::{ ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, }; -use tokio::sync::{watch, Notify}; +use tokio::sync::watch; use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use crate::data::*; +use crate::replication::*; +use crate::schema::*; + pub type MerklePartition = [u8; 2]; pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash { @@ -32,28 +36,30 @@ pub fn hash_of_merkle_partition_opt(p: Option<MerklePartition>) -> Hash { // 16 bits (two bytes) of item's partition keys' hashes. // It builds one Merkle tree for each of these 2**16 partitions. -pub struct MerkleUpdater { - table_name: String, +pub struct MerkleUpdater<F: TableSchema, R: TableReplication> { + data: Arc<TableData<F, R>>, background: Arc<BackgroundRunner>, // Content of the todo tree: items where // - key = the key of an item in the main table, ie hash(partition_key)+sort_key // - value = the hash of the full serialized item, if present, // or an empty vec if item is absent (deleted) - pub(crate) todo: sled::Tree, - pub(crate) todo_notify: Notify, + // Fields in data: + // pub(crate) merkle_todo: sled::Tree, + // pub(crate) merkle_todo_notify: Notify, // Content of the merkle tree: items where // - key = .bytes() for MerkleNodeKey // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found - pub(crate) merkle_tree: sled::Tree, + // Field in data: + // pub(crate) merkle_tree: sled::Tree, empty_node_hash: Hash, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct MerkleNodeKey { // partition: first 16 bits (two bytes) of the partition_key's hash - pub partition: MerklePartition, + pub partition: [u8; 2], // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) #[serde(with = "serde_bytes")] @@ -74,27 +80,26 @@ pub enum MerkleNode { Leaf(Vec<u8>, Hash), } -impl MerkleUpdater { +impl<F, R> MerkleUpdater<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ pub(crate) fn launch( - table_name: String, + data: Arc<TableData<F, R>>, background: Arc<BackgroundRunner>, - todo: sled::Tree, - merkle_tree: sled::Tree, ) -> Arc<Self> { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); let ret = Arc::new(Self { - table_name, + data, background, - todo, - todo_notify: Notify::new(), - merkle_tree, empty_node_hash, }); let ret2 = ret.clone(); ret.background.spawn_worker( - format!("Merkle tree updater for {}", ret.table_name), + format!("Merkle tree updater for {}", ret.data.name), |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), ); @@ -103,27 +108,27 @@ impl MerkleUpdater { async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { - if let Some(x) = self.todo.iter().next() { + if let Some(x) = self.data.merkle_todo.iter().next() { match x { Ok((key, valhash)) => { if let Err(e) = self.update_item(&key[..], &valhash[..]) { warn!( "({}) Error while updating Merkle tree item: {}", - self.table_name, e + self.data.name, e ); } } Err(e) => { warn!( "({}) Error while iterating on Merkle todo tree: {}", - self.table_name, e + self.data.name, e ); tokio::time::sleep(Duration::from_secs(10)).await; } } } else { select! { - _ = self.todo_notify.notified().fuse() => (), + _ = self.data.merkle_todo_notify.notified().fuse() => (), _ = must_exit.changed().fuse() => (), } } @@ -143,18 +148,20 @@ impl MerkleUpdater { partition: k[0..2].try_into().unwrap(), prefix: vec![], }; - self.merkle_tree + self.data + .merkle_tree .transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?; let deleted = self - .todo + .data + .merkle_todo .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? .is_ok(); if !deleted { debug!( "({}) Item not deleted from Merkle todo because it changed: {:?}", - self.table_name, k + self.data.name, k ); } Ok(()) @@ -197,7 +204,7 @@ impl MerkleUpdater { // should not happen warn!( "({}) Replacing intermediate node with empty node, should not happen.", - self.table_name + self.data.name ); Some(MerkleNode::Empty) } else if children.len() == 1 { @@ -301,7 +308,7 @@ impl MerkleUpdater { // Access a node in the Merkle tree, used by the sync protocol pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> { - let ent = self.merkle_tree.get(k.encode())?; + let ent = self.data.merkle_tree.get(k.encode())?; match ent { None => Ok(MerkleNode::Empty), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), @@ -309,11 +316,11 @@ impl MerkleUpdater { } pub fn merkle_tree_len(&self) -> usize { - self.merkle_tree.len() + self.data.merkle_tree.len() } pub fn todo_len(&self) -> usize { - self.todo.len() + self.data.merkle_todo.len() } } |