diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-02 16:58:00 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-02 16:58:00 +0200 |
commit | 9f0f5b2e372a720a807914747fd48ddc93928e04 (patch) | |
tree | a4346766c46469758b9138a09c65fa052e9ad253 /src/table | |
parent | 04901093e7315558bdc147d27adc3f56ec2c98a1 (diff) | |
download | garage-9f0f5b2e372a720a807914747fd48ddc93928e04.tar.gz garage-9f0f5b2e372a720a807914747fd48ddc93928e04.zip |
Adapt Garage to use new DB abstraction
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/Cargo.toml | 2 | ||||
-rw-r--r-- | src/table/data.rs | 63 | ||||
-rw-r--r-- | src/table/gc.rs | 30 | ||||
-rw-r--r-- | src/table/merkle.rs | 54 | ||||
-rw-r--r-- | src/table/metrics.rs | 12 | ||||
-rw-r--r-- | src/table/sync.rs | 5 | ||||
-rw-r--r-- | src/table/table.rs | 4 |
7 files changed, 88 insertions, 82 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6ae50366..6de37cda 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -26,8 +26,6 @@ hexdump = "0.1" tracing = "0.1.30" rand = "0.8" -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/table/data.rs b/src/table/data.rs index 5cb10066..ebfae551 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -3,12 +3,12 @@ use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::{IVec, Transactional}; use tokio::sync::Notify; +use garage_db as db; + use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_rpc::system::System; @@ -25,12 +25,12 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub instance: F, pub replication: R, - pub store: sled::Tree, + pub store: db::Tree, - pub(crate) merkle_tree: sled::Tree, - pub(crate) merkle_todo: sled::Tree, + pub(crate) merkle_tree: db::Tree, + pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, - pub(crate) gc_todo: SledCountedTree, + pub(crate) gc_todo: db::Tree, pub(crate) metrics: TableMetrics, } @@ -40,7 +40,7 @@ where F: TableSchema, R: TableReplication, { - pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { + pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) .expect("Unable to open DB tree"); @@ -55,7 +55,6 @@ where let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); - let gc_todo = SledCountedTree::new(gc_todo); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); @@ -98,30 +97,30 @@ where None => partition_hash.to_vec(), Some(sk) => self.tree_key(partition_key, sk), }; - let range = self.store.range(first_key..); + let range = self.store.range(first_key..)?; self.read_range_aux(partition_hash, range, filter, limit) } EnumerationOrder::Reverse => match start { Some(sk) => { let last_key = self.tree_key(partition_key, sk); - let range = self.store.range(..=last_key).rev(); + let range = self.store.range_rev(..=last_key)?; self.read_range_aux(partition_hash, range, filter, limit) } None => { let mut last_key = partition_hash.to_vec(); let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); - let range = self.store.range(..last_key).rev(); + let range = self.store.range_rev(..last_key)?; self.read_range_aux(partition_hash, range, filter, limit) } }, } } - fn read_range_aux( + fn read_range_aux<'a>( &self, partition_hash: Hash, - range: impl Iterator<Item = sled::Result<(IVec, IVec)>>, + range: db::ValueIter<'a>, filter: &Option<F::Filter>, limit: usize, ) -> Result<Vec<Arc<ByteBuf>>, Error> { @@ -183,12 +182,10 @@ where tree_key: &[u8], f: impl Fn(Option<F::E>) -> F::E, ) -> Result<Option<F::E>, Error> { - let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? { + let changed = self.store.db().transaction(|tx| { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { Some(old_bytes) => { - let old_entry = self - .decode_entry(&old_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } @@ -204,13 +201,17 @@ where // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + .map_err(db::TxError::Abort)?; let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?; - store.insert(tree_key.to_vec(), new_bytes)?; + tx.insert( + &self.merkle_todo, + tree_key.to_vec(), + new_bytes_hash.as_slice(), + )?; + tx.insert(&self.store, tree_key.to_vec(), new_bytes)?; Ok(Some((old_entry, new_entry, new_bytes_hash))) } else { Ok(None) @@ -244,11 +245,11 @@ where } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { + let removed = self.store.db().transaction(|tx| { + if let Some(cur_v) = tx.get(&self.store, k)? { if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; return Ok(true); } } @@ -270,12 +271,12 @@ where k: &[u8], vhash: Hash, ) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { + let removed = self.store.db().transaction(|tx| { + if let Some(cur_v) = tx.get(&self.store, k)? { if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + return Ok(Some(cur_v.into_owned())); } } Ok(None) @@ -316,6 +317,6 @@ where } pub fn gc_todo_len(&self) -> usize { - self.gc_todo.len() + self.gc_todo.len().unwrap() // TODO fix unwrap } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 2a05b6ae..04872a38 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -12,9 +12,10 @@ use futures::select; use futures_util::future::*; use tokio::sync::watch; +use garage_db as db; + use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_util::time::*; use garage_rpc::system::System; @@ -106,7 +107,7 @@ where // List entries in the GC todo list // These entries are put there when a tombstone is inserted in the table // (see update_entry in data.rs) - for entry_kv in self.data.gc_todo.iter() { + for entry_kv in self.data.gc_todo.iter()? { let (k, vhash) = entry_kv?; let mut todo_entry = GcTodoEntry::parse(&k, &vhash); @@ -353,17 +354,17 @@ impl GcTodoEntry { } /// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree - pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self { + pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self { Self { - tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()), - key: sled_k[8..].to_vec(), - value_hash: Hash::try_from(sled_v).unwrap(), + tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()), + key: db_k[8..].to_vec(), + value_hash: Hash::try_from(db_v).unwrap(), value: None, } } /// Saves the GcTodoEntry in the gc_todo tree - pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { + pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; Ok(()) } @@ -373,12 +374,15 @@ impl GcTodoEntry { /// This is usefull to remove a todo entry only under the condition /// that it has not changed since the time it was read, i.e. /// what we have to do is still the same - pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { - let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>( - &self.todo_table_key()[..], - Some(self.value_hash), - None, - )?; + pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { + let key = self.todo_table_key(); + gc_todo_tree.db().transaction(|tx| { + let old_val = tx.get(gc_todo_tree, &key)?; + if old_val == Some(self.value_hash.as_slice().into()) { + tx.remove(gc_todo_tree, &key)?; + } + tx.commit(()) + })?; Ok(()) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 93bf7e47..4b0b44ce 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -4,11 +4,10 @@ use std::time::Duration; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; -use sled::transaction::{ - ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, -}; use tokio::sync::watch; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; @@ -90,7 +89,8 @@ where async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { - if let Some(x) = self.data.merkle_todo.iter().next() { + if let Some(x) = self.data.merkle_todo.iter().unwrap().next() { + // TODO unwrap to remove match x { Ok((key, valhash)) => { if let Err(e) = self.update_item(&key[..], &valhash[..]) { @@ -137,13 +137,18 @@ where }; self.data .merkle_tree + .db() .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; - let deleted = self - .data - .merkle_todo - .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? - .is_ok(); + let deleted = self.data.merkle_todo.db().transaction(|tx| { + let old_val = tx.get(&self.data.merkle_todo, k)?; + if old_val == Some(vhash_by.into()) { + tx.remove(&self.data.merkle_todo, k)?; + tx.commit(true) + } else { + tx.commit(false) + } + })?; if !deleted { debug!( @@ -157,12 +162,12 @@ where fn update_item_rec( &self, - tx: &TransactionalTree, + tx: db::Transaction<'_>, k: &[u8], khash: &Hash, key: &MerkleNodeKey, new_vhash: Option<Hash>, - ) -> ConflictableTransactionResult<Option<Hash>, Error> { + ) -> db::TxResult<Option<Hash>, Error> { let i = key.prefix.len(); // Read node at current position (defined by the prefix stored in key) @@ -203,7 +208,7 @@ where } MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)), x @ MerkleNode::Leaf(_, _) => { - tx.remove(key_sub.encode())?; + tx.remove(&self.data.merkle_tree, key_sub.encode())?; Some(x) } } @@ -283,28 +288,27 @@ where fn read_node_txn( &self, - tx: &TransactionalTree, + tx: db::Transaction<'_>, k: &MerkleNodeKey, - ) -> ConflictableTransactionResult<MerkleNode, Error> { - let ent = tx.get(k.encode())?; - MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort) + ) -> db::TxResult<MerkleNode, Error> { + let ent = tx.get(&self.data.merkle_tree, k.encode())?; + MerkleNode::decode_opt(ent).map_err(db::TxError::Abort) } fn put_node_txn( &self, - tx: &TransactionalTree, + tx: db::Transaction<'_>, k: &MerkleNodeKey, v: &MerkleNode, - ) -> ConflictableTransactionResult<Hash, Error> { + ) -> db::TxResult<Hash, Error> { trace!("Put Merkle node: {:?} => {:?}", k, v); if *v == MerkleNode::Empty { - tx.remove(k.encode())?; + tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v) - .map_err(|e| ConflictableTransactionError::Abort(e.into()))?; + let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); - tx.insert(k.encode(), vby)?; + tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) } } @@ -316,11 +320,11 @@ where } pub fn merkle_tree_len(&self) -> usize { - self.data.merkle_tree.len() + self.data.merkle_tree.len().unwrap() // TODO fix unwrap } pub fn todo_len(&self) -> usize { - self.data.merkle_todo.len() + self.data.merkle_todo.len().unwrap() // TODO fix unwrap } } @@ -347,7 +351,7 @@ impl MerkleNodeKey { } impl MerkleNode { - fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> { + fn decode_opt(ent: Option<db::Value<'_>>) -> Result<Self, Error> { match ent { None => Ok(MerkleNode::Empty), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 752a2a6d..3318de88 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -1,6 +1,6 @@ use opentelemetry::{global, metrics::*, KeyValue}; -use garage_util::sled_counter::SledCountedTree; +use garage_db as db; /// TableMetrics reference all counter used for metrics pub struct TableMetrics { @@ -19,11 +19,7 @@ pub struct TableMetrics { pub(crate) sync_items_received: Counter<u64>, } impl TableMetrics { - pub fn new( - table_name: &'static str, - merkle_todo: sled::Tree, - gc_todo: SledCountedTree, - ) -> Self { + pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: db::Tree) -> Self { let meter = global::meter(table_name); TableMetrics { _merkle_todo_len: meter @@ -31,7 +27,7 @@ impl TableMetrics { "table.merkle_updater_todo_queue_length", move |observer| { observer.observe( - merkle_todo.len() as u64, + merkle_todo.len().unwrap() as u64, // TODO fix unwrap &[KeyValue::new("table_name", table_name)], ) }, @@ -43,7 +39,7 @@ impl TableMetrics { "table.gc_todo_queue_length", move |observer| { observer.observe( - gc_todo.len() as u64, + gc_todo.len().unwrap() as u64, // TODO fix unwrap &[KeyValue::new("table_name", table_name)], ) }, diff --git a/src/table/sync.rs b/src/table/sync.rs index 08069ad0..87dfd1d8 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -258,7 +258,7 @@ where while !*must_exit.borrow() { let mut items = Vec::new(); - for item in self.data.store.range(begin.to_vec()..end.to_vec()) { + for item in self.data.store.range(begin.to_vec()..end.to_vec())? { let (key, value) = item?; items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); @@ -603,7 +603,8 @@ impl SyncTodo { let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if data.store.range(begin..end).next().is_none() { + if data.store.range(begin..end).unwrap().next().is_none() { + // TODO fix unwrap continue; } } diff --git a/src/table/table.rs b/src/table/table.rs index 2a167604..3c211728 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -13,6 +13,8 @@ use opentelemetry::{ Context, }; +use garage_db as db; + use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -69,7 +71,7 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub fn new(instance: F, replication: R, system: Arc<System>, db: &sled::Db) -> Arc<Self> { + pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> { let endpoint = system .netapp .endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME)); |