diff options
-rw-r--r-- | src/block/manager.rs | 17 | ||||
-rw-r--r-- | src/block/metrics.rs | 12 | ||||
-rw-r--r-- | src/db/counted_tree_hack.rs | 130 | ||||
-rw-r--r-- | src/db/lib.rs | 16 | ||||
-rw-r--r-- | src/db/lmdb_adapter.rs | 10 | ||||
-rw-r--r-- | src/db/sled_adapter.rs | 12 | ||||
-rw-r--r-- | src/db/sqlite_adapter.rs | 43 | ||||
-rw-r--r-- | src/table/data.rs | 6 | ||||
-rw-r--r-- | src/table/gc.rs | 20 | ||||
-rw-r--r-- | src/table/metrics.rs | 13 | ||||
-rw-r--r-- | src/util/sled_counter.rs | 100 |
11 files changed, 215 insertions, 164 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 8abff1b1..da86a2d5 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -20,6 +20,7 @@ use opentelemetry::{ }; use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; use garage_util::data::*; use garage_util::error::*; @@ -94,9 +95,9 @@ pub struct BlockManager { rc: BlockRc, - resync_queue: db::Tree, + resync_queue: CountedTree, resync_notify: Notify, - resync_errors: db::Tree, + resync_errors: CountedTree, system: Arc<System>, endpoint: Arc<Endpoint<BlockRpc, Self>>, @@ -126,10 +127,14 @@ impl BlockManager { let resync_queue = db .open_tree("block_local_resync_queue") .expect("Unable to open block_local_resync_queue tree"); + let resync_queue = + CountedTree::new(resync_queue).expect("Could not count block_local_resync_queue"); let resync_errors = db .open_tree("block_local_resync_errors") .expect("Unable to open block_local_resync_errors tree"); + let resync_errors = + CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors"); let endpoint = system .netapp @@ -299,12 +304,16 @@ impl BlockManager { /// Get lenght of resync queue pub fn resync_queue_len(&self) -> Result<usize, Error> { - Ok(self.resync_queue.len()?) + // This currently can't return an error because the CountedTree hack + // doesn't error on .len(), but this will change when we remove the hack + // (hopefully someday!) + Ok(self.resync_queue.len()) } /// Get number of blocks that have an error pub fn resync_errors_len(&self) -> Result<usize, Error> { - Ok(self.resync_errors.len()?) + // (see resync_queue_len comment) + Ok(self.resync_errors.len()) } /// Get number of items in the refcount table diff --git a/src/block/metrics.rs b/src/block/metrics.rs index 0bd50a18..1fc0962a 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -1,6 +1,6 @@ use opentelemetry::{global, metrics::*}; -use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct BlockManagerMetrics { @@ -23,14 +23,12 @@ pub struct BlockManagerMetrics { } impl BlockManagerMetrics { - pub fn new(resync_queue: db::Tree, resync_errors: db::Tree) -> Self { + pub fn new(resync_queue: CountedTree, resync_errors: CountedTree) -> Self { let meter = global::meter("garage_model/block"); Self { _resync_queue_len: meter .u64_value_observer("block.resync_queue_length", move |observer| { - if let Ok(v) = resync_queue.len() { - observer.observe(v as u64, &[]); - } + observer.observe(resync_queue.len() as u64, &[]); }) .with_description( "Number of block hashes queued for local check and possible resync", @@ -38,9 +36,7 @@ impl BlockManagerMetrics { .init(), _resync_errored_blocks: meter .u64_value_observer("block.resync_errored_blocks", move |observer| { - if let Ok(v) = resync_errors.len() { - observer.observe(v as u64, &[]); - } + observer.observe(resync_errors.len() as u64, &[]); }) .with_description("Number of block hashes whose last resync resulted in an error") .init(), diff --git a/src/db/counted_tree_hack.rs b/src/db/counted_tree_hack.rs new file mode 100644 index 00000000..52893b41 --- /dev/null +++ b/src/db/counted_tree_hack.rs @@ -0,0 +1,130 @@ +//! This hack allows a db tree to keep in RAM a counter of the number of entries +//! it contains, which is used to call .len() on it. This is usefull only for +//! the sled backend where .len() otherwise would have to traverse the whole +//! tree to count items. For sqlite and lmdb, this is mostly useless (but +//! hopefully not harmfull!). Note that a CountedTree cannot be part of a +//! transaction. + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use crate::{Result, Tree, TxError, Value, ValueIter}; + +#[derive(Clone)] +pub struct CountedTree(Arc<CountedTreeInternal>); + +struct CountedTreeInternal { + tree: Tree, + len: AtomicUsize, +} + +impl CountedTree { + pub fn new(tree: Tree) -> Result<Self> { + let len = tree.len()?; + Ok(Self(Arc::new(CountedTreeInternal { + tree, + len: AtomicUsize::new(len), + }))) + } + + pub fn len(&self) -> usize { + self.0.len.load(Ordering::Relaxed) + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Value>> { + self.0.tree.get(key) + } + + pub fn first(&self) -> Result<Option<(Value, Value)>> { + self.0.tree.first() + } + + pub fn iter(&self) -> Result<ValueIter<'_>> { + self.0.tree.iter() + } + + // ---- writing functions ---- + + pub fn insert<K, V>(&self, key: K, value: V) -> Result<bool> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let inserted = self.0.tree.insert(key, value)?; + if inserted { + self.0.len.fetch_add(1, Ordering::Relaxed); + } + Ok(inserted) + } + + pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<bool> { + let removed = self.0.tree.remove(key)?; + if removed { + self.0.len.fetch_sub(1, Ordering::Relaxed); + } + Ok(removed) + } + + /* + pub fn pop_min(&self) -> Result<Option<(Value, Value)>> { + let res = self.0.tree.pop_min(); + if let Ok(Some(_)) = &res { + self.0.len.fetch_sub(1, Ordering::Relaxed); + }; + res + } + */ + + pub fn compare_and_swap<K, OV, NV>( + &self, + key: K, + expected_old: Option<OV>, + new: Option<NV>, + ) -> Result<bool> + where + K: AsRef<[u8]>, + OV: AsRef<[u8]>, + NV: AsRef<[u8]>, + { + let old_some = expected_old.is_some(); + let new_some = new.is_some(); + + match self.0.tree.db().transaction(|mut tx| { + let old_val = tx.get(&self.0.tree, &key)?; + if old_val.as_ref().map(|x| &x[..]) == expected_old.as_ref().map(AsRef::as_ref) { + match &new { + Some(v) => { + tx.insert(&self.0.tree, &key, v)?; + } + None => { + tx.remove(&self.0.tree, &key)?; + } + } + tx.commit(()) + } else { + tx.abort(()) + } + }) { + Ok(()) => { + match (old_some, new_some) { + (false, true) => { + self.0.len.fetch_add(1, Ordering::Relaxed); + } + (true, false) => { + self.0.len.fetch_sub(1, Ordering::Relaxed); + } + _ => (), + } + Ok(true) + } + Err(TxError::Abort(())) => Ok(false), + Err(TxError::Db(e)) => Err(e), + } + } +} diff --git a/src/db/lib.rs b/src/db/lib.rs index afea9f55..4543e53c 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -2,6 +2,8 @@ pub mod lmdb_adapter; pub mod sled_adapter; pub mod sqlite_adapter; +pub mod counted_tree_hack; + #[cfg(test)] pub mod test; @@ -164,10 +166,13 @@ impl Tree { .transpose() } + /// True if item didn't exist before, false if item already existed + /// and was replaced. #[inline] - pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> { + pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<bool> { self.0.insert(self.1, key.as_ref(), value.as_ref()) } + /// True if item was removed, false if item already didn't exist #[inline] pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<bool> { self.0.remove(self.1, key.as_ref()) @@ -215,15 +220,18 @@ impl<'a> Transaction<'a> { self.0.len(tree.1) } + /// True if item didn't exist before, false if item already existed + /// and was replaced. #[inline] pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>( &mut self, tree: &Tree, key: T, value: U, - ) -> Result<()> { + ) -> Result<bool> { self.0.insert(tree.1, key.as_ref(), value.as_ref()) } + /// True if item was removed, false if item already didn't exist #[inline] pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> Result<bool> { self.0.remove(tree.1, key.as_ref()) @@ -281,7 +289,7 @@ pub(crate) trait IDb: Send + Sync { fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; fn len(&self, tree: usize) -> Result<usize>; - fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool>; fn remove(&self, tree: usize, key: &[u8]) -> Result<bool>; fn iter(&self, tree: usize) -> Result<ValueIter<'_>>; @@ -307,7 +315,7 @@ pub(crate) trait ITx { fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; fn len(&self, tree: usize) -> Result<usize>; - fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool>; fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool>; fn iter(&self, tree: usize) -> Result<ValueIter<'_>>; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index 3f468128..d8e5bcd3 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -122,12 +122,13 @@ impl IDb for LmdbDb { Ok(tree.len(&tx)?.try_into().unwrap()) } - fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> { let tree = self.get_tree(tree)?; let mut tx = self.db.write_txn()?; + let old_val = tree.get(&tx, key)?.map(Vec::from); tree.put(&mut tx, key, value)?; tx.commit()?; - Ok(()) + Ok(old_val.is_none()) } fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { @@ -221,10 +222,11 @@ impl<'a, 'db> ITx for LmdbTx<'a, 'db> { unimplemented!(".len() in transaction not supported with LMDB backend") } - fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> { let tree = *self.get_tree(tree)?; + let old_val = tree.get(&self.tx, key)?.map(Vec::from); tree.put(&mut self.tx, key, value)?; - Ok(()) + Ok(old_val.is_none()) } fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> { let tree = *self.get_tree(tree)?; diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs index 97fec2c7..18f457c8 100644 --- a/src/db/sled_adapter.rs +++ b/src/db/sled_adapter.rs @@ -93,10 +93,10 @@ impl IDb for SledDb { Ok(tree.len()) } - fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> { let tree = self.get_tree(tree)?; - tree.insert(key, value)?; - Ok(()) + let old_val = tree.insert(key, value)?; + Ok(old_val.is_none()) } fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { @@ -206,10 +206,10 @@ impl<'a> ITx for SledTx<'a> { unimplemented!(".len() in transaction not supported with Sled backend") } - fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> { let tree = self.get_tree(tree)?; - self.save_error(tree.insert(key, value))?; - Ok(()) + let old_val = self.save_error(tree.insert(key, value))?; + Ok(old_val.is_none()) } fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> { let tree = self.get_tree(tree)?; diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 0c8a0746..32557a53 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -54,6 +54,17 @@ impl SqliteDbInner { .map(String::as_str) .ok_or_else(|| Error("invalid tree id".into())) } + + fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> { + let mut stmt = self + .db + .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; + let mut res_iter = stmt.query([key])?; + match res_iter.next()? { + None => Ok(None), + Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?)), + } + } } impl IDb for SqliteDb { @@ -111,15 +122,7 @@ impl IDb for SqliteDb { trace!("get {}: lock acquired", tree); let tree = this.get_tree(tree)?; - - let mut stmt = this - .db - .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; - let mut res_iter = stmt.query([key])?; - match res_iter.next()? { - None => Ok(None), - Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?)), - } + this.internal_get(tree, key) } fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> { @@ -148,17 +151,18 @@ impl IDb for SqliteDb { } } - fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> { trace!("insert {}: lock db", tree); let this = self.0.lock().unwrap(); trace!("insert {}: lock acquired", tree); let tree = this.get_tree(tree)?; + let old_val = this.internal_get(tree, key)?; this.db.execute( &format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree), params![key, value], )?; - Ok(()) + Ok(old_val.is_none()) } fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { @@ -276,11 +280,8 @@ impl<'a> SqliteTx<'a> { ) }) } -} -impl<'a> ITx for SqliteTx<'a> { - fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { - let tree = self.get_tree(tree)?; + fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> { let mut stmt = self .tx .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; @@ -290,6 +291,13 @@ impl<'a> ITx for SqliteTx<'a> { Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?)), } } +} + +impl<'a> ITx for SqliteTx<'a> { + fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { + let tree = self.get_tree(tree)?; + self.internal_get(tree, key) + } fn len(&self, tree: usize) -> Result<usize> { let tree = self.get_tree(tree)?; let mut stmt = self.tx.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; @@ -300,13 +308,14 @@ impl<'a> ITx for SqliteTx<'a> { } } - fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> { let tree = self.get_tree(tree)?; + let old_val = self.internal_get(tree, key)?; self.tx.execute( &format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree), params![key, value], )?; - Ok(()) + Ok(old_val.is_none()) } fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> { let tree = self.get_tree(tree)?; diff --git a/src/table/data.rs b/src/table/data.rs index 839dae94..3212e82b 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -6,6 +6,7 @@ use serde_bytes::ByteBuf; use tokio::sync::Notify; use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; use garage_util::data::*; use garage_util::error::*; @@ -30,7 +31,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub(crate) merkle_tree: db::Tree, pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, - pub(crate) gc_todo: db::Tree, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, } @@ -55,6 +56,7 @@ where let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); + let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); @@ -319,6 +321,6 @@ where } pub fn gc_todo_len(&self) -> Result<usize, Error> { - Ok(self.gc_todo.len()?) + Ok(self.gc_todo.len()) } } diff --git a/src/table/gc.rs b/src/table/gc.rs index e8843339..e7fbbcb0 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -12,7 +12,7 @@ use futures::select; use futures_util::future::*; use tokio::sync::watch; -use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; use garage_util::data::*; use garage_util::error::*; @@ -370,7 +370,7 @@ impl GcTodoEntry { } /// Saves the GcTodoEntry in the gc_todo tree - pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { + pub(crate) fn save(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> { gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; Ok(()) } @@ -380,16 +380,12 @@ impl GcTodoEntry { /// This is usefull to remove a todo entry only under the condition /// that it has not changed since the time it was read, i.e. /// what we have to do is still the same - pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { - let key = self.todo_table_key(); - gc_todo_tree.db().transaction(|mut tx| { - let remove = - matches!(tx.get(gc_todo_tree, &key)?, Some(ov) if ov == self.value_hash.as_slice()); - if remove { - tx.remove(gc_todo_tree, &key)?; - } - tx.commit(()) - })?; + pub(crate) fn remove_if_equal(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> { + gc_todo_tree.compare_and_swap::<_, _, &[u8]>( + &self.todo_table_key(), + Some(self.value_hash), + None, + )?; Ok(()) } diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 13baf4c6..3a1783e0 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -1,6 +1,7 @@ use opentelemetry::{global, metrics::*, KeyValue}; use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct TableMetrics { @@ -19,7 +20,7 @@ pub struct TableMetrics { pub(crate) sync_items_received: Counter<u64>, } impl TableMetrics { - pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: db::Tree) -> Self { + pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self { let meter = global::meter(table_name); TableMetrics { _merkle_todo_len: meter @@ -40,12 +41,10 @@ impl TableMetrics { .u64_value_observer( "table.gc_todo_queue_length", move |observer| { - if let Ok(v) = gc_todo.len() { - observer.observe( - v as u64, - &[KeyValue::new("table_name", table_name)], - ); - } + observer.observe( + gc_todo.len() as u64, + &[KeyValue::new("table_name", table_name)], + ); }, ) .with_description("Table garbage collector TODO queue length") diff --git a/src/util/sled_counter.rs b/src/util/sled_counter.rs deleted file mode 100644 index bc54cea0..00000000 --- a/src/util/sled_counter.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; - -use sled::{CompareAndSwapError, IVec, Iter, Result, Tree}; - -#[derive(Clone)] -pub struct SledCountedTree(Arc<SledCountedTreeInternal>); - -struct SledCountedTreeInternal { - tree: Tree, - len: AtomicUsize, -} - -impl SledCountedTree { - pub fn new(tree: Tree) -> Self { - let len = tree.len(); - Self(Arc::new(SledCountedTreeInternal { - tree, - len: AtomicUsize::new(len), - })) - } - - pub fn len(&self) -> usize { - self.0.len.load(Ordering::Relaxed) - } - - pub fn is_empty(&self) -> bool { - self.0.tree.is_empty() - } - - pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> { - self.0.tree.get(key) - } - - pub fn iter(&self) -> Iter { - self.0.tree.iter() - } - - // ---- writing functions ---- - - pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>> - where - K: AsRef<[u8]>, - V: Into<IVec>, - { - let res = self.0.tree.insert(key, value); - if res == Ok(None) { - self.0.len.fetch_add(1, Ordering::Relaxed); - } - res - } - - pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> { - let res = self.0.tree.remove(key); - if matches!(res, Ok(Some(_))) { - self.0.len.fetch_sub(1, Ordering::Relaxed); - } - res - } - - pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> { - let res = self.0.tree.pop_min(); - if let Ok(Some(_)) = &res { - self.0.len.fetch_sub(1, Ordering::Relaxed); - }; - res - } - - pub fn compare_and_swap<K, OV, NV>( - &self, - key: K, - old: Option<OV>, - new: Option<NV>, - ) -> Result<std::result::Result<(), CompareAndSwapError>> - where - K: AsRef<[u8]>, - OV: AsRef<[u8]>, - NV: Into<IVec>, - { - let old_some = old.is_some(); - let new_some = new.is_some(); - - let res = self.0.tree.compare_and_swap(key, old, new); - - if res == Ok(Ok(())) { - match (old_some, new_some) { - (false, true) => { - self.0.len.fetch_add(1, Ordering::Relaxed); - } - (true, false) => { - self.0.len.fetch_sub(1, Ordering::Relaxed); - } - _ => (), - } - } - res - } -} |