aboutsummaryrefslogtreecommitdiff
path: root/src/table/data.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/data.rs')
-rw-r--r--src/table/data.rs63
1 files changed, 32 insertions, 31 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 5cb10066..ebfae551 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -3,12 +3,12 @@ use std::convert::TryInto;
use std::sync::Arc;
use serde_bytes::ByteBuf;
-use sled::{IVec, Transactional};
use tokio::sync::Notify;
+use garage_db as db;
+
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::sled_counter::SledCountedTree;
use garage_rpc::system::System;
@@ -25,12 +25,12 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub instance: F,
pub replication: R,
- pub store: sled::Tree,
+ pub store: db::Tree,
- pub(crate) merkle_tree: sled::Tree,
- pub(crate) merkle_todo: sled::Tree,
+ pub(crate) merkle_tree: db::Tree,
+ pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
- pub(crate) gc_todo: SledCountedTree,
+ pub(crate) gc_todo: db::Tree,
pub(crate) metrics: TableMetrics,
}
@@ -40,7 +40,7 @@ where
F: TableSchema,
R: TableReplication,
{
- pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
+ pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
@@ -55,7 +55,6 @@ where
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree");
- let gc_todo = SledCountedTree::new(gc_todo);
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
@@ -98,30 +97,30 @@ where
None => partition_hash.to_vec(),
Some(sk) => self.tree_key(partition_key, sk),
};
- let range = self.store.range(first_key..);
+ let range = self.store.range(first_key..)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
EnumerationOrder::Reverse => match start {
Some(sk) => {
let last_key = self.tree_key(partition_key, sk);
- let range = self.store.range(..=last_key).rev();
+ let range = self.store.range_rev(..=last_key)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
None => {
let mut last_key = partition_hash.to_vec();
let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap());
last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1));
- let range = self.store.range(..last_key).rev();
+ let range = self.store.range_rev(..last_key)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
},
}
}
- fn read_range_aux(
+ fn read_range_aux<'a>(
&self,
partition_hash: Hash,
- range: impl Iterator<Item = sled::Result<(IVec, IVec)>>,
+ range: db::ValueIter<'a>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
@@ -183,12 +182,10 @@ where
tree_key: &[u8],
f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> {
- let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? {
+ let changed = self.store.db().transaction(|tx| {
+ let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
Some(old_bytes) => {
- let old_entry = self
- .decode_entry(&old_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
let new_entry = f(Some(old_entry.clone()));
(Some(old_entry), Some(old_bytes), new_entry)
}
@@ -204,13 +201,17 @@ where
// the associated Merkle tree entry.
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RmpEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ .map_err(db::TxError::Abort)?;
let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
if value_changed || encoding_changed {
let new_bytes_hash = blake2sum(&new_bytes[..]);
- mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?;
- store.insert(tree_key.to_vec(), new_bytes)?;
+ tx.insert(
+ &self.merkle_todo,
+ tree_key.to_vec(),
+ new_bytes_hash.as_slice(),
+ )?;
+ tx.insert(&self.store, tree_key.to_vec(), new_bytes)?;
Ok(Some((old_entry, new_entry, new_bytes_hash)))
} else {
Ok(None)
@@ -244,11 +245,11 @@ where
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
+ let removed = self.store.db().transaction(|tx| {
+ if let Some(cur_v) = tx.get(&self.store, k)? {
if cur_v == v {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
return Ok(true);
}
}
@@ -270,12 +271,12 @@ where
k: &[u8],
vhash: Hash,
) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
+ let removed = self.store.db().transaction(|tx| {
+ if let Some(cur_v) = tx.get(&self.store, k)? {
if blake2sum(&cur_v[..]) == vhash {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(Some(cur_v));
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+ return Ok(Some(cur_v.into_owned()));
}
}
Ok(None)
@@ -316,6 +317,6 @@ where
}
pub fn gc_todo_len(&self) -> usize {
- self.gc_todo.len()
+ self.gc_todo.len().unwrap() // TODO fix unwrap
}
}