aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-02-24 14:59:49 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-14 10:54:25 +0100
commit2377a92f6b64165be84f53722f30c21e9c8a0b3e (patch)
tree4a45feb7bd73bd541cc5f8552423cb9d46c8e26d /src/util
parent203e8d2c345dabee56e31aebf88b706df5aa72e5 (diff)
downloadgarage-2377a92f6b64165be84f53722f30c21e9c8a0b3e.tar.gz
garage-2377a92f6b64165be84f53722f30c21e9c8a0b3e.zip
Add wrapper over sled tree to count items (used for big queues)
Diffstat (limited to 'src/util')
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/sled_counter.rs92
2 files changed, 93 insertions, 0 deletions
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 7ed00034..e83fc2e6 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -10,6 +10,7 @@ pub mod data;
pub mod error;
pub mod metrics;
pub mod persister;
+pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;
diff --git a/src/util/sled_counter.rs b/src/util/sled_counter.rs
new file mode 100644
index 00000000..8af04f50
--- /dev/null
+++ b/src/util/sled_counter.rs
@@ -0,0 +1,92 @@
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+
+use sled::{CompareAndSwapError, IVec, Iter, Result, Tree};
+
+#[derive(Clone)]
+pub struct SledCountedTree(Arc<SledCountedTreeInternal>);
+
+struct SledCountedTreeInternal {
+ tree: Tree,
+ len: AtomicUsize,
+}
+
+impl SledCountedTree {
+ pub fn new(tree: Tree) -> Self {
+ let len = tree.len();
+ Self(Arc::new(SledCountedTreeInternal {
+ tree,
+ len: AtomicUsize::new(len),
+ }))
+ }
+
+ pub fn len(&self) -> usize {
+ self.0.len.load(Ordering::Relaxed)
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.0.tree.is_empty()
+ }
+
+ pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
+ self.0.tree.get(key)
+ }
+
+ pub fn iter(&self) -> Iter {
+ self.0.tree.iter()
+ }
+
+ // ---- writing functions ----
+
+ pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
+ where
+ K: AsRef<[u8]>,
+ V: Into<IVec>,
+ {
+ let res = self.0.tree.insert(key, value);
+ if res == Ok(None) {
+ self.0.len.fetch_add(1, Ordering::Relaxed);
+ }
+ res
+ }
+
+ pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
+ let res = self.0.tree.pop_min();
+ if let Ok(Some(_)) = &res {
+ self.0.len.fetch_sub(1, Ordering::Relaxed);
+ };
+ res
+ }
+
+ pub fn compare_and_swap<K, OV, NV>(
+ &self,
+ key: K,
+ old: Option<OV>,
+ new: Option<NV>,
+ ) -> Result<std::result::Result<(), CompareAndSwapError>>
+ where
+ K: AsRef<[u8]>,
+ OV: AsRef<[u8]>,
+ NV: Into<IVec>,
+ {
+ let old_some = old.is_some();
+ let new_some = new.is_some();
+
+ let res = self.0.tree.compare_and_swap(key, old, new);
+
+ if res == Ok(Ok(())) {
+ match (old_some, new_some) {
+ (false, true) => {
+ self.0.len.fetch_add(1, Ordering::Relaxed);
+ }
+ (true, false) => {
+ self.0.len.fetch_sub(1, Ordering::Relaxed);
+ }
+ _ => (),
+ }
+ }
+ res
+ }
+}