diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 91 |
1 files changed, 62 insertions, 29 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 366ce925..0e75754c 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -8,6 +8,7 @@ use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use sled::Transactional; use garage_util::data::*; use garage_util::error::Error; @@ -18,6 +19,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::crdt::CRDT; +use crate::merkle::*; use crate::schema::*; use crate::table_sync::*; @@ -33,6 +35,7 @@ pub struct Table<F: TableSchema, R: TableReplication> { pub system: Arc<System>, pub store: sled::Tree, pub syncer: ArcSwapOption<TableSyncer<F, R>>, + merkle_updater: Arc<MerkleUpdater>, } #[derive(Serialize, Deserialize)] @@ -77,7 +80,7 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub async fn new( + pub fn new( instance: F, replication: R, system: Arc<System>, @@ -85,11 +88,27 @@ where name: String, rpc_server: &mut RpcServer, ) -> Arc<Self> { - let store = db.open_tree(&name).expect("Unable to open DB tree"); + let store = db + .open_tree(&format!("{}:table", name)) + .expect("Unable to open DB tree"); + + let merkle_todo_store = db + .open_tree(&format!("{}:merkle_todo", name)) + .expect("Unable to open DB Merkle TODO tree"); + let merkle_tree_store = db + .open_tree(&format!("{}:merkle_tree", name)) + .expect("Unable to open DB Merkle tree tree"); let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); + let merkle_updater = MerkleUpdater::new( + name.clone(), + system.background.clone(), + merkle_todo_store, + merkle_tree_store, + ); + let table = Arc::new(Self { instance, replication, @@ -98,12 +117,15 @@ where system, store, syncer: ArcSwapOption::from(None), + merkle_updater, }); table.clone().register_handler(rpc_server, rpc_path); - let syncer = TableSyncer::launch(table.clone()).await; + let syncer = TableSyncer::launch(table.clone()); table.syncer.swap(Some(syncer)); + table.merkle_updater.launch(); + table } @@ -322,7 +344,7 @@ where Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { - self.handle_update(pairs).await?; + self.handle_update(pairs)?; Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { @@ -380,53 +402,64 @@ where Ok(ret) } - pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); + // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================ + pub fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { for update_bytes in entries.iter() { - let update = self.decode_entry(update_bytes.as_slice())?; - - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let (old_entry, new_entry) = self.store.transaction(|db| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; + self.update_entry(update_bytes.as_slice())?; + } + Ok(()) + } + pub(crate) fn update_entry(self: &Arc<Self>, update_bytes: &[u8]) -> Result<(), Error> { + let update = self.decode_entry(update_bytes)?; + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { + let (old_entry, new_entry) = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RMPEncode) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; db.insert(tree_key.clone(), new_bytes)?; - Ok((old_entry, new_entry)) - })?; - - if old_entry.as_ref() != Some(&new_entry) { - self.instance.updated(old_entry, Some(new_entry)); - syncer.invalidate(&tree_key[..]); + Ok(Some((old_entry, new_entry))) + } else { + Ok(None) } + })?; + + if let Some((old_entry, new_entry)) = changed { + self.instance.updated(old_entry, Some(new_entry)); + self.syncer.load_full().unwrap().invalidate(&tree_key[..]); } Ok(()) } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = self.store.transaction(|txn| { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { if let Some(cur_v) = txn.get(k)? { if cur_v == v { txn.remove(k)?; + mkl_todo.insert(k, vec![])?; return Ok(true); } } Ok(false) })?; + if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); |