diff options
Diffstat (limited to 'src/table/data.rs')
-rw-r--r-- | src/table/data.rs | 63 |
1 files changed, 32 insertions, 31 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index 5cb10066..ebfae551 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -3,12 +3,12 @@ use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::{IVec, Transactional}; use tokio::sync::Notify; +use garage_db as db; + use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_rpc::system::System; @@ -25,12 +25,12 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub instance: F, pub replication: R, - pub store: sled::Tree, + pub store: db::Tree, - pub(crate) merkle_tree: sled::Tree, - pub(crate) merkle_todo: sled::Tree, + pub(crate) merkle_tree: db::Tree, + pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, - pub(crate) gc_todo: SledCountedTree, + pub(crate) gc_todo: db::Tree, pub(crate) metrics: TableMetrics, } @@ -40,7 +40,7 @@ where F: TableSchema, R: TableReplication, { - pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { + pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) .expect("Unable to open DB tree"); @@ -55,7 +55,6 @@ where let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); - let gc_todo = SledCountedTree::new(gc_todo); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); @@ -98,30 +97,30 @@ where None => partition_hash.to_vec(), Some(sk) => self.tree_key(partition_key, sk), }; - let range = self.store.range(first_key..); + let range = self.store.range(first_key..)?; self.read_range_aux(partition_hash, range, filter, limit) } EnumerationOrder::Reverse => match start { Some(sk) => { let last_key = self.tree_key(partition_key, sk); - let range = self.store.range(..=last_key).rev(); + let range = self.store.range_rev(..=last_key)?; self.read_range_aux(partition_hash, range, filter, limit) } None => { let mut last_key = partition_hash.to_vec(); let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); - let range = self.store.range(..last_key).rev(); + let range = self.store.range_rev(..last_key)?; self.read_range_aux(partition_hash, range, filter, limit) } }, } } - fn read_range_aux( + fn read_range_aux<'a>( &self, partition_hash: Hash, - range: impl Iterator<Item = sled::Result<(IVec, IVec)>>, + range: db::ValueIter<'a>, filter: &Option<F::Filter>, limit: usize, ) -> Result<Vec<Arc<ByteBuf>>, Error> { @@ -183,12 +182,10 @@ where tree_key: &[u8], f: impl Fn(Option<F::E>) -> F::E, ) -> Result<Option<F::E>, Error> { - let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? { + let changed = self.store.db().transaction(|tx| { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { Some(old_bytes) => { - let old_entry = self - .decode_entry(&old_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } @@ -204,13 +201,17 @@ where // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + .map_err(db::TxError::Abort)?; let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?; - store.insert(tree_key.to_vec(), new_bytes)?; + tx.insert( + &self.merkle_todo, + tree_key.to_vec(), + new_bytes_hash.as_slice(), + )?; + tx.insert(&self.store, tree_key.to_vec(), new_bytes)?; Ok(Some((old_entry, new_entry, new_bytes_hash))) } else { Ok(None) @@ -244,11 +245,11 @@ where } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { + let removed = self.store.db().transaction(|tx| { + if let Some(cur_v) = tx.get(&self.store, k)? { if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; return Ok(true); } } @@ -270,12 +271,12 @@ where k: &[u8], vhash: Hash, ) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { + let removed = self.store.db().transaction(|tx| { + if let Some(cur_v) = tx.get(&self.store, k)? { if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + return Ok(Some(cur_v.into_owned())); } } Ok(None) @@ -316,6 +317,6 @@ where } pub fn gc_todo_len(&self) -> usize { - self.gc_todo.len() + self.gc_todo.len().unwrap() // TODO fix unwrap } } |