diff options
Diffstat (limited to 'src/table/data.rs')
-rw-r--r-- | src/table/data.rs | 187 |
1 files changed, 120 insertions, 67 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index ff7965f5..3212e82b 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -1,13 +1,15 @@ use core::borrow::Borrow; +use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::Transactional; use tokio::sync::Notify; +use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; + use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_rpc::system::System; @@ -16,19 +18,20 @@ use crate::gc::GcTodoEntry; use crate::metrics::*; use crate::replication::*; use crate::schema::*; +use crate::util::*; pub struct TableData<F: TableSchema, R: TableReplication> { system: Arc<System>, - pub(crate) instance: F, - pub(crate) replication: R, + pub instance: F, + pub replication: R, - pub store: sled::Tree, + pub store: db::Tree, - pub(crate) merkle_tree: sled::Tree, - pub(crate) merkle_todo: sled::Tree, + pub(crate) merkle_tree: db::Tree, + pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, - pub(crate) gc_todo: SledCountedTree, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, } @@ -38,7 +41,7 @@ where F: TableSchema, R: TableReplication, { - pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { + pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) .expect("Unable to open DB tree"); @@ -53,7 +56,7 @@ where let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); - let gc_todo = SledCountedTree::new(gc_todo); + let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); @@ -83,18 +86,48 @@ where pub fn read_range( &self, - p: &F::P, - s: &Option<F::S>, + partition_key: &F::P, + start: &Option<F::S>, + filter: &Option<F::Filter>, + limit: usize, + enumeration_order: EnumerationOrder, + ) -> Result<Vec<Arc<ByteBuf>>, Error> { + let partition_hash = partition_key.hash(); + match enumeration_order { + EnumerationOrder::Forward => { + let first_key = match start { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(partition_key, sk), + }; + let range = self.store.range(first_key..)?; + self.read_range_aux(partition_hash, range, filter, limit) + } + EnumerationOrder::Reverse => match start { + Some(sk) => { + let last_key = self.tree_key(partition_key, sk); + let range = self.store.range_rev(..=last_key)?; + self.read_range_aux(partition_hash, range, filter, limit) + } + None => { + let mut last_key = partition_hash.to_vec(); + let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); + last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); + let range = self.store.range_rev(..last_key)?; + self.read_range_aux(partition_hash, range, filter, limit) + } + }, + } + } + + fn read_range_aux<'a>( + &self, + partition_hash: Hash, + range: db::ValueIter<'a>, filter: &Option<F::Filter>, limit: usize, ) -> Result<Vec<Arc<ByteBuf>>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; let mut ret = vec![]; - for item in self.store.range(first_key..) { + for item in range { let (key, value) = item?; if &key[..32] != partition_hash.as_slice() { break; @@ -107,7 +140,7 @@ where } }; if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + ret.push(Arc::new(ByteBuf::from(value))); } if ret.len() >= limit { break; @@ -136,17 +169,29 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(&tree_key)? { + self.update_entry_with(&tree_key[..], |ent| match ent { + Some(mut ent) => { + ent.merge(&update); + ent + } + None => update.clone(), + })?; + Ok(()) + } + + pub fn update_entry_with( + &self, + tree_key: &[u8], + f: impl Fn(Option<F::E>) -> F::E, + ) -> Result<Option<F::E>, Error> { + let changed = self.store.db().transaction(|mut tx| { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { Some(old_bytes) => { - let old_entry = self - .decode_entry(&old_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); + let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; + let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, update.clone()), + None => (None, None, f(None)), }; // Scenario 1: the value changed, so of course there is a change @@ -158,24 +203,28 @@ where // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + .map_err(db::TxError::Abort)?; let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); + drop(old_bytes); if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; - store.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry, new_bytes_hash))) + tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?; + tx.insert(&self.store, tree_key, new_bytes)?; + + self.instance + .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; + + Ok(Some((new_entry, new_bytes_hash))) } else { Ok(None) } })?; - if let Some((old_entry, new_entry, new_bytes_hash)) = changed { + if let Some((new_entry, new_bytes_hash)) = changed { self.metrics.internal_update_counter.add(1); let is_tombstone = new_entry.is_tombstone(); - self.instance.updated(old_entry, Some(new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { // We are only responsible for GC'ing this item if we are the @@ -187,31 +236,34 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; + GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?; } } - } - Ok(()) + Ok(Some(new_entry)) + } else { + Ok(None) + } } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); + let removed = self + .store + .db() + .transaction(|mut tx| match tx.get(&self.store, k)? { + Some(cur_v) if cur_v == v => { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + + let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; + self.instance.updated(&mut tx, Some(&old_entry), None)?; + Ok(true) } - } - Ok(false) - })?; + _ => Ok(false), + })?; if removed { self.metrics.internal_delete_counter.add(1); - - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); self.merkle_todo_notify.notify_one(); } Ok(removed) @@ -222,36 +274,37 @@ where k: &[u8], vhash: Hash, ) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); + let removed = self + .store + .db() + .transaction(|mut tx| match tx.get(&self.store, k)? { + Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + + let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; + self.instance.updated(&mut tx, Some(&old_entry), None)?; + Ok(true) } - } - Ok(None) - })?; + _ => Ok(false), + })?; - if let Some(old_v) = removed { - let old_entry = self.decode_entry(&old_v[..])?; - self.instance.updated(Some(old_entry), None); + if removed { + self.metrics.internal_delete_counter.add(1); self.merkle_todo_notify.notify_one(); - Ok(true) - } else { - Ok(false) } + Ok(removed) } // ---- Utility functions ---- - pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { + pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); ret } - pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { + pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { Ok(x) => Ok(x), Err(e) => match F::try_migrate(bytes) { @@ -267,7 +320,7 @@ where } } - pub fn gc_todo_len(&self) -> usize { - self.gc_todo.len() + pub fn gc_todo_len(&self) -> Result<usize, Error> { + Ok(self.gc_todo.len()) } } |