diff options
Diffstat (limited to 'src/table/data.rs')
-rw-r--r-- | src/table/data.rs | 113 |
1 files changed, 59 insertions, 54 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index 5cb10066..3212e82b 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -3,12 +3,13 @@ use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::{IVec, Transactional}; use tokio::sync::Notify; +use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; + use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_rpc::system::System; @@ -25,12 +26,12 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub instance: F, pub replication: R, - pub store: sled::Tree, + pub store: db::Tree, - pub(crate) merkle_tree: sled::Tree, - pub(crate) merkle_todo: sled::Tree, + pub(crate) merkle_tree: db::Tree, + pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, - pub(crate) gc_todo: SledCountedTree, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, } @@ -40,7 +41,7 @@ where F: TableSchema, R: TableReplication, { - pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { + pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) .expect("Unable to open DB tree"); @@ -55,7 +56,7 @@ where let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); - let gc_todo = SledCountedTree::new(gc_todo); + let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); @@ -98,30 +99,30 @@ where None => partition_hash.to_vec(), Some(sk) => self.tree_key(partition_key, sk), }; - let range = self.store.range(first_key..); + let range = self.store.range(first_key..)?; self.read_range_aux(partition_hash, range, filter, limit) } EnumerationOrder::Reverse => match start { Some(sk) => { let last_key = self.tree_key(partition_key, sk); - let range = self.store.range(..=last_key).rev(); + let range = self.store.range_rev(..=last_key)?; self.read_range_aux(partition_hash, range, filter, limit) } None => { let mut last_key = partition_hash.to_vec(); let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); - let range = self.store.range(..last_key).rev(); + let range = self.store.range_rev(..last_key)?; self.read_range_aux(partition_hash, range, filter, limit) } }, } } - fn read_range_aux( + fn read_range_aux<'a>( &self, partition_hash: Hash, - range: impl Iterator<Item = sled::Result<(IVec, IVec)>>, + range: db::ValueIter<'a>, filter: &Option<F::Filter>, limit: usize, ) -> Result<Vec<Arc<ByteBuf>>, Error> { @@ -139,7 +140,7 @@ where } }; if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + ret.push(Arc::new(ByteBuf::from(value))); } if ret.len() >= limit { break; @@ -183,12 +184,10 @@ where tree_key: &[u8], f: impl Fn(Option<F::E>) -> F::E, ) -> Result<Option<F::E>, Error> { - let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? { + let changed = self.store.db().transaction(|mut tx| { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { Some(old_bytes) => { - let old_entry = self - .decode_entry(&old_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } @@ -204,24 +203,28 @@ where // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + .map_err(db::TxError::Abort)?; let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); + drop(old_bytes); if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?; - store.insert(tree_key.to_vec(), new_bytes)?; - Ok(Some((old_entry, new_entry, new_bytes_hash))) + tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?; + tx.insert(&self.store, tree_key, new_bytes)?; + + self.instance + .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; + + Ok(Some((new_entry, new_bytes_hash))) } else { Ok(None) } })?; - if let Some((old_entry, new_entry, new_bytes_hash)) = changed { + if let Some((new_entry, new_bytes_hash)) = changed { self.metrics.internal_update_counter.add(1); let is_tombstone = new_entry.is_tombstone(); - self.instance.updated(old_entry.as_ref(), Some(&new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { // We are only responsible for GC'ing this item if we are the @@ -244,22 +247,23 @@ where } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); + let removed = self + .store + .db() + .transaction(|mut tx| match tx.get(&self.store, k)? { + Some(cur_v) if cur_v == v => { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + + let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; + self.instance.updated(&mut tx, Some(&old_entry), None)?; + Ok(true) } - } - Ok(false) - })?; + _ => Ok(false), + })?; if removed { self.metrics.internal_delete_counter.add(1); - - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); } Ok(removed) @@ -270,25 +274,26 @@ where k: &[u8], vhash: Hash, ) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); + let removed = self + .store + .db() + .transaction(|mut tx| match tx.get(&self.store, k)? { + Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + + let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; + self.instance.updated(&mut tx, Some(&old_entry), None)?; + Ok(true) } - } - Ok(None) - })?; + _ => Ok(false), + })?; - if let Some(old_v) = removed { - let old_entry = self.decode_entry(&old_v[..])?; - self.instance.updated(Some(&old_entry), None); + if removed { + self.metrics.internal_delete_counter.add(1); self.merkle_todo_notify.notify_one(); - Ok(true) - } else { - Ok(false) } + Ok(removed) } // ---- Utility functions ---- @@ -315,7 +320,7 @@ where } } - pub fn gc_todo_len(&self) -> usize { - self.gc_todo.len() + pub fn gc_todo_len(&self) -> Result<usize, Error> { + Ok(self.gc_todo.len()) } } |