aboutsummaryrefslogtreecommitdiff
path: root/src/table/data.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/data.rs')
-rw-r--r--src/table/data.rs136
1 files changed, 65 insertions, 71 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 0029b936..9aa2a3bc 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -4,62 +4,59 @@ use std::sync::Arc;
use log::warn;
use serde_bytes::ByteBuf;
use sled::Transactional;
+use tokio::sync::Notify;
-use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::*;
use crate::crdt::CRDT;
-use crate::merkle::*;
+use crate::replication::*;
use crate::schema::*;
-pub struct TableData<F: TableSchema> {
+pub struct TableData<F: TableSchema, R: TableReplication> {
pub name: String,
- pub instance: F,
+
+ pub(crate) instance: F,
+ pub(crate) replication: R,
pub store: sled::Tree,
+
+ pub(crate) merkle_tree: sled::Tree,
+ pub(crate) merkle_todo: sled::Tree,
+ pub(crate) merkle_todo_notify: Notify,
pub(crate) gc_todo: sled::Tree,
- pub merkle_updater: Arc<MerkleUpdater>,
}
-impl<F> TableData<F>
+impl<F, R> TableData<F, R>
where
F: TableSchema,
+ R: TableReplication,
{
- pub fn new(
- name: String,
- instance: F,
- db: &sled::Db,
- background: Arc<BackgroundRunner>,
- ) -> Arc<Self> {
+ pub fn new(name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree");
- let merkle_todo_store = db
- .open_tree(&format!("{}:merkle_todo", name))
- .expect("Unable to open DB Merkle TODO tree");
- let merkle_tree_store = db
+ let merkle_tree = db
.open_tree(&format!("{}:merkle_tree", name))
.expect("Unable to open DB Merkle tree tree");
+ let merkle_todo = db
+ .open_tree(&format!("{}:merkle_todo", name))
+ .expect("Unable to open DB Merkle TODO tree");
let gc_todo = db
.open_tree(&format!("{}:gc_todo", name))
.expect("Unable to open DB tree");
- let merkle_updater = MerkleUpdater::launch(
- name.clone(),
- background,
- merkle_todo_store,
- merkle_tree_store,
- );
-
Arc::new(Self {
name,
instance,
+ replication,
store,
+ merkle_tree,
+ merkle_todo,
+ merkle_todo_notify: Notify::new(),
gc_todo,
- merkle_updater,
})
}
@@ -129,37 +126,36 @@ where
let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
- let changed =
- (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
- let (old_entry, new_entry) = match store.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = self
- .decode_entry(&prev_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let mut new_entry = old_entry.clone();
- new_entry.merge(&update);
- (Some(old_entry), new_entry)
- }
- None => (None, update.clone()),
- };
-
- if Some(&new_entry) != old_entry.as_ref() {
- let new_bytes = rmp_to_vec_all_named(&new_entry)
- .map_err(Error::RMPEncode)
+ let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ let (old_entry, new_entry) = match store.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = self
+ .decode_entry(&prev_bytes)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let new_bytes_hash = blake2sum(&new_bytes[..]);
- mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
- store.insert(tree_key.clone(), new_bytes)?;
- Ok(Some((old_entry, new_entry, new_bytes_hash)))
- } else {
- Ok(None)
+ let mut new_entry = old_entry.clone();
+ new_entry.merge(&update);
+ (Some(old_entry), new_entry)
}
- })?;
+ None => (None, update.clone()),
+ };
+
+ if Some(&new_entry) != old_entry.as_ref() {
+ let new_bytes = rmp_to_vec_all_named(&new_entry)
+ .map_err(Error::RMPEncode)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let new_bytes_hash = blake2sum(&new_bytes[..]);
+ mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
+ store.insert(tree_key.clone(), new_bytes)?;
+ Ok(Some((old_entry, new_entry, new_bytes_hash)))
+ } else {
+ Ok(None)
+ }
+ })?;
if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry));
- self.merkle_updater.todo_notify.notify_one();
+ self.merkle_todo_notify.notify_one();
if is_tombstone {
self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
}
@@ -169,22 +165,21 @@ where
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed =
- (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if cur_v == v {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(true);
- }
+ let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if cur_v == v {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(true);
}
- Ok(false)
- })?;
+ }
+ Ok(false)
+ })?;
if removed {
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);
- self.merkle_updater.todo_notify.notify_one();
+ self.merkle_todo_notify.notify_one();
}
Ok(removed)
}
@@ -194,22 +189,21 @@ where
k: &[u8],
vhash: Hash,
) -> Result<bool, Error> {
- let removed =
- (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if blake2sum(&cur_v[..]) == vhash {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(Some(cur_v));
- }
+ let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if blake2sum(&cur_v[..]) == vhash {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(Some(cur_v));
}
- Ok(None)
- })?;
+ }
+ Ok(None)
+ })?;
if let Some(old_v) = removed {
let old_entry = self.decode_entry(&old_v[..])?;
self.instance.updated(Some(old_entry), None);
- self.merkle_updater.todo_notify.notify_one();
+ self.merkle_todo_notify.notify_one();
Ok(true)
} else {
Ok(false)