aboutsummaryrefslogtreecommitdiff
path: root/src/table/data.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/data.rs')
-rw-r--r--src/table/data.rs113
1 files changed, 59 insertions, 54 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 5cb10066..3212e82b 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -3,12 +3,13 @@ use std::convert::TryInto;
use std::sync::Arc;
use serde_bytes::ByteBuf;
-use sled::{IVec, Transactional};
use tokio::sync::Notify;
+use garage_db as db;
+use garage_db::counted_tree_hack::CountedTree;
+
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::sled_counter::SledCountedTree;
use garage_rpc::system::System;
@@ -25,12 +26,12 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub instance: F,
pub replication: R,
- pub store: sled::Tree,
+ pub store: db::Tree,
- pub(crate) merkle_tree: sled::Tree,
- pub(crate) merkle_todo: sled::Tree,
+ pub(crate) merkle_tree: db::Tree,
+ pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
- pub(crate) gc_todo: SledCountedTree,
+ pub(crate) gc_todo: CountedTree,
pub(crate) metrics: TableMetrics,
}
@@ -40,7 +41,7 @@ where
F: TableSchema,
R: TableReplication,
{
- pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
+ pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
@@ -55,7 +56,7 @@ where
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree");
- let gc_todo = SledCountedTree::new(gc_todo);
+ let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
@@ -98,30 +99,30 @@ where
None => partition_hash.to_vec(),
Some(sk) => self.tree_key(partition_key, sk),
};
- let range = self.store.range(first_key..);
+ let range = self.store.range(first_key..)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
EnumerationOrder::Reverse => match start {
Some(sk) => {
let last_key = self.tree_key(partition_key, sk);
- let range = self.store.range(..=last_key).rev();
+ let range = self.store.range_rev(..=last_key)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
None => {
let mut last_key = partition_hash.to_vec();
let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap());
last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1));
- let range = self.store.range(..last_key).rev();
+ let range = self.store.range_rev(..last_key)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
},
}
}
- fn read_range_aux(
+ fn read_range_aux<'a>(
&self,
partition_hash: Hash,
- range: impl Iterator<Item = sled::Result<(IVec, IVec)>>,
+ range: db::ValueIter<'a>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
@@ -139,7 +140,7 @@ where
}
};
if keep {
- ret.push(Arc::new(ByteBuf::from(value.as_ref())));
+ ret.push(Arc::new(ByteBuf::from(value)));
}
if ret.len() >= limit {
break;
@@ -183,12 +184,10 @@ where
tree_key: &[u8],
f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> {
- let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? {
+ let changed = self.store.db().transaction(|mut tx| {
+ let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
Some(old_bytes) => {
- let old_entry = self
- .decode_entry(&old_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
let new_entry = f(Some(old_entry.clone()));
(Some(old_entry), Some(old_bytes), new_entry)
}
@@ -204,24 +203,28 @@ where
// the associated Merkle tree entry.
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RmpEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ .map_err(db::TxError::Abort)?;
let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
+ drop(old_bytes);
if value_changed || encoding_changed {
let new_bytes_hash = blake2sum(&new_bytes[..]);
- mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?;
- store.insert(tree_key.to_vec(), new_bytes)?;
- Ok(Some((old_entry, new_entry, new_bytes_hash)))
+ tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
+ tx.insert(&self.store, tree_key, new_bytes)?;
+
+ self.instance
+ .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
+
+ Ok(Some((new_entry, new_bytes_hash)))
} else {
Ok(None)
}
})?;
- if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
+ if let Some((new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone();
- self.instance.updated(old_entry.as_ref(), Some(&new_entry));
self.merkle_todo_notify.notify_one();
if is_tombstone {
// We are only responsible for GC'ing this item if we are the
@@ -244,22 +247,23 @@ where
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if cur_v == v {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(true);
+ let removed = self
+ .store
+ .db()
+ .transaction(|mut tx| match tx.get(&self.store, k)? {
+ Some(cur_v) if cur_v == v => {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+
+ let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
+ self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ Ok(true)
}
- }
- Ok(false)
- })?;
+ _ => Ok(false),
+ })?;
if removed {
self.metrics.internal_delete_counter.add(1);
-
- let old_entry = self.decode_entry(v)?;
- self.instance.updated(Some(&old_entry), None);
self.merkle_todo_notify.notify_one();
}
Ok(removed)
@@ -270,25 +274,26 @@ where
k: &[u8],
vhash: Hash,
) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if blake2sum(&cur_v[..]) == vhash {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(Some(cur_v));
+ let removed = self
+ .store
+ .db()
+ .transaction(|mut tx| match tx.get(&self.store, k)? {
+ Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+
+ let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
+ self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ Ok(true)
}
- }
- Ok(None)
- })?;
+ _ => Ok(false),
+ })?;
- if let Some(old_v) = removed {
- let old_entry = self.decode_entry(&old_v[..])?;
- self.instance.updated(Some(&old_entry), None);
+ if removed {
+ self.metrics.internal_delete_counter.add(1);
self.merkle_todo_notify.notify_one();
- Ok(true)
- } else {
- Ok(false)
}
+ Ok(removed)
}
// ---- Utility functions ----
@@ -315,7 +320,7 @@ where
}
}
- pub fn gc_todo_len(&self) -> usize {
- self.gc_todo.len()
+ pub fn gc_todo_len(&self) -> Result<usize, Error> {
+ Ok(self.gc_todo.len())
}
}