aboutsummaryrefslogtreecommitdiff
path: root/src/table/data.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-06 15:46:00 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-06 15:46:00 +0200
commit18978153585d870d567c5ad7d9e9c96d3b65a884 (patch)
tree1b3757c260fc6db9596aa53e979595a2fdece59b /src/table/data.rs
parentc56d858834bbfbe3edea2dc0c825bf3b5ce51c98 (diff)
downloadgarage-18978153585d870d567c5ad7d9e9c96d3b65a884.tar.gz
garage-18978153585d870d567c5ad7d9e9c96d3b65a884.zip
Table updated trigger now happens in transaction, this is waaaay better!
Diffstat (limited to 'src/table/data.rs')
-rw-r--r--src/table/data.rs68
1 files changed, 37 insertions, 31 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 6352ac24..e688168f 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -209,17 +209,20 @@ where
let new_bytes_hash = blake2sum(&new_bytes[..]);
tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
tx.insert(&self.store, tree_key, new_bytes)?;
- Ok(Some((old_entry, new_entry, new_bytes_hash)))
+
+ self.instance
+ .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
+
+ Ok(Some((new_entry, new_bytes_hash)))
} else {
Ok(None)
}
})?;
- if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
+ if let Some((new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone();
- self.instance.updated(old_entry.as_ref(), Some(&new_entry));
self.merkle_todo_notify.notify_one();
if is_tombstone {
// We are only responsible for GC'ing this item if we are the
@@ -242,20 +245,23 @@ where
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = self.store.db().transaction(|mut tx| {
- let remove = matches!(tx.get(&self.store, k)?, Some(cur_v) if cur_v == v);
- if remove {
- tx.remove(&self.store, k)?;
- tx.insert(&self.merkle_todo, k, vec![])?;
- }
- Ok(remove)
- })?;
+ let removed = self
+ .store
+ .db()
+ .transaction(|mut tx| match tx.get(&self.store, k)? {
+ Some(cur_v) if cur_v == v => {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+
+ let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
+ self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ Ok(true)
+ }
+ _ => Ok(false),
+ })?;
if removed {
self.metrics.internal_delete_counter.add(1);
-
- let old_entry = self.decode_entry(v)?;
- self.instance.updated(Some(&old_entry), None);
self.merkle_todo_notify.notify_one();
}
Ok(removed)
@@ -266,26 +272,26 @@ where
k: &[u8],
vhash: Hash,
) -> Result<bool, Error> {
- let removed = self.store.db().transaction(|mut tx| {
- let remove_v = match tx.get(&self.store, k)? {
- Some(cur_v) if blake2sum(&cur_v[..]) == vhash => Some(cur_v),
- _ => None,
- };
- if remove_v.is_some() {
- tx.remove(&self.store, k)?;
- tx.insert(&self.merkle_todo, k, vec![])?;
- }
- Ok(remove_v)
- })?;
+ let removed = self
+ .store
+ .db()
+ .transaction(|mut tx| match tx.get(&self.store, k)? {
+ Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+
+ let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
+ self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ Ok(true)
+ }
+ _ => Ok(false),
+ })?;
- if let Some(old_v) = removed {
- let old_entry = self.decode_entry(&old_v[..])?;
- self.instance.updated(Some(&old_entry), None);
+ if removed {
+ self.metrics.internal_delete_counter.add(1);
self.merkle_todo_notify.notify_one();
- Ok(true)
- } else {
- Ok(false)
}
+ Ok(removed)
}
// ---- Utility functions ----