diff options
author | Alex Auvolat <alex@adnab.me> | 2022-02-24 14:59:49 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-03-14 10:54:25 +0100 |
commit | 2377a92f6b64165be84f53722f30c21e9c8a0b3e (patch) | |
tree | 4a45feb7bd73bd541cc5f8552423cb9d46c8e26d /src/util | |
parent | 203e8d2c345dabee56e31aebf88b706df5aa72e5 (diff) | |
download | garage-2377a92f6b64165be84f53722f30c21e9c8a0b3e.tar.gz garage-2377a92f6b64165be84f53722f30c21e9c8a0b3e.zip |
Add wrapper over sled tree to count items (used for big queues)
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/lib.rs | 1 | ||||
-rw-r--r-- | src/util/sled_counter.rs | 92 |
2 files changed, 93 insertions, 0 deletions
diff --git a/src/util/lib.rs b/src/util/lib.rs index 7ed00034..e83fc2e6 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -10,6 +10,7 @@ pub mod data; pub mod error; pub mod metrics; pub mod persister; +pub mod sled_counter; pub mod time; pub mod token_bucket; pub mod tranquilizer; diff --git a/src/util/sled_counter.rs b/src/util/sled_counter.rs new file mode 100644 index 00000000..8af04f50 --- /dev/null +++ b/src/util/sled_counter.rs @@ -0,0 +1,92 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use sled::{CompareAndSwapError, IVec, Iter, Result, Tree}; + +#[derive(Clone)] +pub struct SledCountedTree(Arc<SledCountedTreeInternal>); + +struct SledCountedTreeInternal { + tree: Tree, + len: AtomicUsize, +} + +impl SledCountedTree { + pub fn new(tree: Tree) -> Self { + let len = tree.len(); + Self(Arc::new(SledCountedTreeInternal { + tree, + len: AtomicUsize::new(len), + })) + } + + pub fn len(&self) -> usize { + self.0.len.load(Ordering::Relaxed) + } + + pub fn is_empty(&self) -> bool { + self.0.tree.is_empty() + } + + pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> { + self.0.tree.get(key) + } + + pub fn iter(&self) -> Iter { + self.0.tree.iter() + } + + // ---- writing functions ---- + + pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>> + where + K: AsRef<[u8]>, + V: Into<IVec>, + { + let res = self.0.tree.insert(key, value); + if res == Ok(None) { + self.0.len.fetch_add(1, Ordering::Relaxed); + } + res + } + + pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> { + let res = self.0.tree.pop_min(); + if let Ok(Some(_)) = &res { + self.0.len.fetch_sub(1, Ordering::Relaxed); + }; + res + } + + pub fn compare_and_swap<K, OV, NV>( + &self, + key: K, + old: Option<OV>, + new: Option<NV>, + ) -> Result<std::result::Result<(), CompareAndSwapError>> + where + K: AsRef<[u8]>, + OV: AsRef<[u8]>, + NV: Into<IVec>, + { + let old_some = old.is_some(); + let new_some = new.is_some(); + + let res = self.0.tree.compare_and_swap(key, old, new); + + if res == Ok(Ok(())) { + match (old_some, new_some) { + (false, true) => { + self.0.len.fetch_add(1, Ordering::Relaxed); + } + (true, false) => { + self.0.len.fetch_sub(1, Ordering::Relaxed); + } + _ => (), + } + } + res + } +} |