diff options
author | Alex Auvolat <alex@adnab.me> | 2022-02-24 14:59:49 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-03-14 10:54:25 +0100 |
commit | 2377a92f6b64165be84f53722f30c21e9c8a0b3e (patch) | |
tree | 4a45feb7bd73bd541cc5f8552423cb9d46c8e26d | |
parent | 203e8d2c345dabee56e31aebf88b706df5aa72e5 (diff) | |
download | garage-2377a92f6b64165be84f53722f30c21e9c8a0b3e.tar.gz garage-2377a92f6b64165be84f53722f30c21e9c8a0b3e.zip |
Add wrapper over sled tree to count items (used for big queues)
-rw-r--r-- | src/model/block.rs | 4 | ||||
-rw-r--r-- | src/model/block_metrics.rs | 4 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 7 | ||||
-rw-r--r-- | src/table/data.rs | 4 | ||||
-rw-r--r-- | src/table/gc.rs | 5 | ||||
-rw-r--r-- | src/table/metrics.rs | 8 | ||||
-rw-r--r-- | src/util/lib.rs | 1 | ||||
-rw-r--r-- | src/util/sled_counter.rs | 92 |
8 files changed, 118 insertions, 7 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index d97e64a8..97e06f0e 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -22,6 +22,7 @@ use opentelemetry::{ use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; +use garage_util::sled_counter::SledCountedTree; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -155,7 +156,7 @@ pub struct BlockManager { rc: sled::Tree, - resync_queue: sled::Tree, + resync_queue: SledCountedTree, resync_notify: Notify, system: Arc<System>, @@ -184,6 +185,7 @@ impl BlockManager { let resync_queue = db .open_tree("block_local_resync_queue") .expect("Unable to open block_local_resync_queue tree"); + let resync_queue = SledCountedTree::new(resync_queue); let endpoint = system .netapp diff --git a/src/model/block_metrics.rs b/src/model/block_metrics.rs index 7ef9a117..819af241 100644 --- a/src/model/block_metrics.rs +++ b/src/model/block_metrics.rs @@ -1,5 +1,7 @@ use opentelemetry::{global, metrics::*}; +use garage_util::sled_counter::SledCountedTree; + /// TableMetrics reference all counter used for metrics pub struct BlockManagerMetrics { pub(crate) _resync_queue_len: ValueObserver<u64>, @@ -20,7 +22,7 @@ pub struct BlockManagerMetrics { } impl BlockManagerMetrics { - pub fn new(resync_queue: sled::Tree) -> Self { + pub fn new(resync_queue: SledCountedTree) -> Self { let meter = global::meter("garage_model/block"); Self { _resync_queue_len: meter diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 89225511..1b351024 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -134,7 +134,11 @@ impl RpcHelper { M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { - let metric_tags = [KeyValue::new("rpc_endpoint", endpoint.path().to_string())]; + let metric_tags = [ + KeyValue::new("rpc_endpoint", endpoint.path().to_string()), + KeyValue::new("from", format!("{:?}", self.0.our_node_id)), + KeyValue::new("to", format!("{:?}", to)), + ]; let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let permit = self @@ -245,6 +249,7 @@ impl RpcHelper { ) }; let mut span = tracer.start(span_name); + span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id))); span.set_attribute(KeyValue::new("to", format!("{:?}", to))); span.set_attribute(KeyValue::new("quorum", quorum as i64)); span.set_attribute(KeyValue::new( diff --git a/src/table/data.rs b/src/table/data.rs index 4293e395..ff7965f5 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -7,6 +7,7 @@ use tokio::sync::Notify; use garage_util::data::*; use garage_util::error::*; +use garage_util::sled_counter::SledCountedTree; use garage_rpc::system::System; @@ -27,7 +28,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub(crate) merkle_tree: sled::Tree, pub(crate) merkle_todo: sled::Tree, pub(crate) merkle_todo_notify: Notify, - pub(crate) gc_todo: sled::Tree, + pub(crate) gc_todo: SledCountedTree, pub(crate) metrics: TableMetrics, } @@ -52,6 +53,7 @@ where let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); + let gc_todo = SledCountedTree::new(gc_todo); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); diff --git a/src/table/gc.rs b/src/table/gc.rs index 8d0a5bef..2a05b6ae 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -14,6 +14,7 @@ use tokio::sync::watch; use garage_util::data::*; use garage_util::error::*; +use garage_util::sled_counter::SledCountedTree; use garage_util::time::*; use garage_rpc::system::System; @@ -362,7 +363,7 @@ impl GcTodoEntry { } /// Saves the GcTodoEntry in the gc_todo tree - pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { + pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; Ok(()) } @@ -372,7 +373,7 @@ impl GcTodoEntry { /// This is usefull to remove a todo entry only under the condition /// that it has not changed since the time it was read, i.e. /// what we have to do is still the same - pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { + pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>( &self.todo_table_key()[..], Some(self.value_hash), diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 548bf0d6..752a2a6d 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -1,5 +1,7 @@ use opentelemetry::{global, metrics::*, KeyValue}; +use garage_util::sled_counter::SledCountedTree; + /// TableMetrics reference all counter used for metrics pub struct TableMetrics { pub(crate) _merkle_todo_len: ValueObserver<u64>, @@ -17,7 +19,11 @@ pub struct TableMetrics { pub(crate) sync_items_received: Counter<u64>, } impl TableMetrics { - pub fn new(table_name: &'static str, merkle_todo: sled::Tree, gc_todo: sled::Tree) -> Self { + pub fn new( + table_name: &'static str, + merkle_todo: sled::Tree, + gc_todo: SledCountedTree, + ) -> Self { let meter = global::meter(table_name); TableMetrics { _merkle_todo_len: meter diff --git a/src/util/lib.rs b/src/util/lib.rs index 7ed00034..e83fc2e6 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -10,6 +10,7 @@ pub mod data; pub mod error; pub mod metrics; pub mod persister; +pub mod sled_counter; pub mod time; pub mod token_bucket; pub mod tranquilizer; diff --git a/src/util/sled_counter.rs b/src/util/sled_counter.rs new file mode 100644 index 00000000..8af04f50 --- /dev/null +++ b/src/util/sled_counter.rs @@ -0,0 +1,92 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use sled::{CompareAndSwapError, IVec, Iter, Result, Tree}; + +#[derive(Clone)] +pub struct SledCountedTree(Arc<SledCountedTreeInternal>); + +struct SledCountedTreeInternal { + tree: Tree, + len: AtomicUsize, +} + +impl SledCountedTree { + pub fn new(tree: Tree) -> Self { + let len = tree.len(); + Self(Arc::new(SledCountedTreeInternal { + tree, + len: AtomicUsize::new(len), + })) + } + + pub fn len(&self) -> usize { + self.0.len.load(Ordering::Relaxed) + } + + pub fn is_empty(&self) -> bool { + self.0.tree.is_empty() + } + + pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> { + self.0.tree.get(key) + } + + pub fn iter(&self) -> Iter { + self.0.tree.iter() + } + + // ---- writing functions ---- + + pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>> + where + K: AsRef<[u8]>, + V: Into<IVec>, + { + let res = self.0.tree.insert(key, value); + if res == Ok(None) { + self.0.len.fetch_add(1, Ordering::Relaxed); + } + res + } + + pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> { + let res = self.0.tree.pop_min(); + if let Ok(Some(_)) = &res { + self.0.len.fetch_sub(1, Ordering::Relaxed); + }; + res + } + + pub fn compare_and_swap<K, OV, NV>( + &self, + key: K, + old: Option<OV>, + new: Option<NV>, + ) -> Result<std::result::Result<(), CompareAndSwapError>> + where + K: AsRef<[u8]>, + OV: AsRef<[u8]>, + NV: Into<IVec>, + { + let old_some = old.is_some(); + let new_some = new.is_some(); + + let res = self.0.tree.compare_and_swap(key, old, new); + + if res == Ok(Ok(())) { + match (old_some, new_some) { + (false, true) => { + self.0.len.fetch_add(1, Ordering::Relaxed); + } + (true, false) => { + self.0.len.fetch_sub(1, Ordering::Relaxed); + } + _ => (), + } + } + res + } +} |