diff options
Diffstat (limited to 'src/table/data.rs')
-rw-r--r-- | src/table/data.rs | 136 |
1 files changed, 65 insertions, 71 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index 0029b936..9aa2a3bc 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -4,62 +4,59 @@ use std::sync::Arc; use log::warn; use serde_bytes::ByteBuf; use sled::Transactional; +use tokio::sync::Notify; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; use crate::crdt::CRDT; -use crate::merkle::*; +use crate::replication::*; use crate::schema::*; -pub struct TableData<F: TableSchema> { +pub struct TableData<F: TableSchema, R: TableReplication> { pub name: String, - pub instance: F, + + pub(crate) instance: F, + pub(crate) replication: R, pub store: sled::Tree, + + pub(crate) merkle_tree: sled::Tree, + pub(crate) merkle_todo: sled::Tree, + pub(crate) merkle_todo_notify: Notify, pub(crate) gc_todo: sled::Tree, - pub merkle_updater: Arc<MerkleUpdater>, } -impl<F> TableData<F> +impl<F, R> TableData<F, R> where F: TableSchema, + R: TableReplication, { - pub fn new( - name: String, - instance: F, - db: &sled::Db, - background: Arc<BackgroundRunner>, - ) -> Arc<Self> { + pub fn new(name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", name)) .expect("Unable to open DB tree"); - let merkle_todo_store = db - .open_tree(&format!("{}:merkle_todo", name)) - .expect("Unable to open DB Merkle TODO tree"); - let merkle_tree_store = db + let merkle_tree = db .open_tree(&format!("{}:merkle_tree", name)) .expect("Unable to open DB Merkle tree tree"); + let merkle_todo = db + .open_tree(&format!("{}:merkle_todo", name)) + .expect("Unable to open DB Merkle TODO tree"); let gc_todo = db .open_tree(&format!("{}:gc_todo", name)) .expect("Unable to open DB tree"); - let merkle_updater = MerkleUpdater::launch( - name.clone(), - background, - merkle_todo_store, - merkle_tree_store, - ); - Arc::new(Self { name, instance, + replication, store, + merkle_tree, + merkle_todo, + merkle_todo_notify: Notify::new(), gc_todo, - merkle_updater, }) } @@ -129,37 +126,36 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - let changed = - (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - let (old_entry, new_entry) = match store.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; - - if Some(&new_entry) != old_entry.as_ref() { - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) + let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { + let (old_entry, new_entry) = match store.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; - store.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry, new_bytes_hash))) - } else { - Ok(None) + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) } - })?; + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let new_bytes_hash = blake2sum(&new_bytes[..]); + mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; + store.insert(tree_key.clone(), new_bytes)?; + Ok(Some((old_entry, new_entry, new_bytes_hash))) + } else { + Ok(None) + } + })?; if let Some((old_entry, new_entry, new_bytes_hash)) = changed { let is_tombstone = new_entry.is_tombstone(); self.instance.updated(old_entry, Some(new_entry)); - self.merkle_updater.todo_notify.notify_one(); + self.merkle_todo_notify.notify_one(); if is_tombstone { self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; } @@ -169,22 +165,21 @@ where } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = - (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); - } + let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if cur_v == v { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(true); } - Ok(false) - })?; + } + Ok(false) + })?; if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); - self.merkle_updater.todo_notify.notify_one(); + self.merkle_todo_notify.notify_one(); } Ok(removed) } @@ -194,22 +189,21 @@ where k: &[u8], vhash: Hash, ) -> Result<bool, Error> { - let removed = - (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); - } + let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if blake2sum(&cur_v[..]) == vhash { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(Some(cur_v)); } - Ok(None) - })?; + } + Ok(None) + })?; if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; self.instance.updated(Some(old_entry), None); - self.merkle_updater.todo_notify.notify_one(); + self.merkle_todo_notify.notify_one(); Ok(true) } else { Ok(false) |