aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs91
1 files changed, 62 insertions, 29 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 366ce925..0e75754c 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -8,6 +8,7 @@ use arc_swap::ArcSwapOption;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
+use sled::Transactional;
use garage_util::data::*;
use garage_util::error::Error;
@@ -18,6 +19,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
+use crate::merkle::*;
use crate::schema::*;
use crate::table_sync::*;
@@ -33,6 +35,7 @@ pub struct Table<F: TableSchema, R: TableReplication> {
pub system: Arc<System>,
pub store: sled::Tree,
pub syncer: ArcSwapOption<TableSyncer<F, R>>,
+ merkle_updater: Arc<MerkleUpdater>,
}
#[derive(Serialize, Deserialize)]
@@ -77,7 +80,7 @@ where
{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
- pub async fn new(
+ pub fn new(
instance: F,
replication: R,
system: Arc<System>,
@@ -85,11 +88,27 @@ where
name: String,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let store = db.open_tree(&name).expect("Unable to open DB tree");
+ let store = db
+ .open_tree(&format!("{}:table", name))
+ .expect("Unable to open DB tree");
+
+ let merkle_todo_store = db
+ .open_tree(&format!("{}:merkle_todo", name))
+ .expect("Unable to open DB Merkle TODO tree");
+ let merkle_tree_store = db
+ .open_tree(&format!("{}:merkle_tree", name))
+ .expect("Unable to open DB Merkle tree tree");
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
+ let merkle_updater = MerkleUpdater::new(
+ name.clone(),
+ system.background.clone(),
+ merkle_todo_store,
+ merkle_tree_store,
+ );
+
let table = Arc::new(Self {
instance,
replication,
@@ -98,12 +117,15 @@ where
system,
store,
syncer: ArcSwapOption::from(None),
+ merkle_updater,
});
table.clone().register_handler(rpc_server, rpc_path);
- let syncer = TableSyncer::launch(table.clone()).await;
+ let syncer = TableSyncer::launch(table.clone());
table.syncer.swap(Some(syncer));
+ table.merkle_updater.launch();
+
table
}
@@ -322,7 +344,7 @@ where
Ok(TableRPC::Update(values))
}
TableRPC::Update(pairs) => {
- self.handle_update(pairs).await?;
+ self.handle_update(pairs)?;
Ok(TableRPC::Ok)
}
TableRPC::SyncRPC(rpc) => {
@@ -380,53 +402,64 @@ where
Ok(ret)
}
- pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
- let syncer = self.syncer.load_full().unwrap();
+ // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================
+ pub fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
for update_bytes in entries.iter() {
- let update = self.decode_entry(update_bytes.as_slice())?;
-
- let tree_key = self.tree_key(update.partition_key(), update.sort_key());
-
- let (old_entry, new_entry) = self.store.transaction(|db| {
- let (old_entry, new_entry) = match db.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = self
- .decode_entry(&prev_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let mut new_entry = old_entry.clone();
- new_entry.merge(&update);
- (Some(old_entry), new_entry)
- }
- None => (None, update.clone()),
- };
+ self.update_entry(update_bytes.as_slice())?;
+ }
+ Ok(())
+ }
+ pub(crate) fn update_entry(self: &Arc<Self>, update_bytes: &[u8]) -> Result<(), Error> {
+ let update = self.decode_entry(update_bytes)?;
+ let tree_key = self.tree_key(update.partition_key(), update.sort_key());
+
+ let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| {
+ let (old_entry, new_entry) = match db.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = self
+ .decode_entry(&prev_bytes)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let mut new_entry = old_entry.clone();
+ new_entry.merge(&update);
+ (Some(old_entry), new_entry)
+ }
+ None => (None, update.clone()),
+ };
+
+ if Some(&new_entry) != old_entry.as_ref() {
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RMPEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?;
db.insert(tree_key.clone(), new_bytes)?;
- Ok((old_entry, new_entry))
- })?;
-
- if old_entry.as_ref() != Some(&new_entry) {
- self.instance.updated(old_entry, Some(new_entry));
- syncer.invalidate(&tree_key[..]);
+ Ok(Some((old_entry, new_entry)))
+ } else {
+ Ok(None)
}
+ })?;
+
+ if let Some((old_entry, new_entry)) = changed {
+ self.instance.updated(old_entry, Some(new_entry));
+ self.syncer.load_full().unwrap().invalidate(&tree_key[..]);
}
Ok(())
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = self.store.transaction(|txn| {
+ let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| {
if let Some(cur_v) = txn.get(k)? {
if cur_v == v {
txn.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
return Ok(true);
}
}
Ok(false)
})?;
+
if removed {
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);