aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-02-24 14:59:49 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-14 10:54:25 +0100
commit2377a92f6b64165be84f53722f30c21e9c8a0b3e (patch)
tree4a45feb7bd73bd541cc5f8552423cb9d46c8e26d
parent203e8d2c345dabee56e31aebf88b706df5aa72e5 (diff)
downloadgarage-2377a92f6b64165be84f53722f30c21e9c8a0b3e.tar.gz
garage-2377a92f6b64165be84f53722f30c21e9c8a0b3e.zip
Add wrapper over sled tree to count items (used for big queues)
-rw-r--r--src/model/block.rs4
-rw-r--r--src/model/block_metrics.rs4
-rw-r--r--src/rpc/rpc_helper.rs7
-rw-r--r--src/table/data.rs4
-rw-r--r--src/table/gc.rs5
-rw-r--r--src/table/metrics.rs8
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/sled_counter.rs92
8 files changed, 118 insertions, 7 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index d97e64a8..97e06f0e 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -22,6 +22,7 @@ use opentelemetry::{
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
+use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -155,7 +156,7 @@ pub struct BlockManager {
rc: sled::Tree,
- resync_queue: sled::Tree,
+ resync_queue: SledCountedTree,
resync_notify: Notify,
system: Arc<System>,
@@ -184,6 +185,7 @@ impl BlockManager {
let resync_queue = db
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
+ let resync_queue = SledCountedTree::new(resync_queue);
let endpoint = system
.netapp
diff --git a/src/model/block_metrics.rs b/src/model/block_metrics.rs
index 7ef9a117..819af241 100644
--- a/src/model/block_metrics.rs
+++ b/src/model/block_metrics.rs
@@ -1,5 +1,7 @@
use opentelemetry::{global, metrics::*};
+use garage_util::sled_counter::SledCountedTree;
+
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
pub(crate) _resync_queue_len: ValueObserver<u64>,
@@ -20,7 +22,7 @@ pub struct BlockManagerMetrics {
}
impl BlockManagerMetrics {
- pub fn new(resync_queue: sled::Tree) -> Self {
+ pub fn new(resync_queue: SledCountedTree) -> Self {
let meter = global::meter("garage_model/block");
Self {
_resync_queue_len: meter
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 89225511..1b351024 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -134,7 +134,11 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
- let metric_tags = [KeyValue::new("rpc_endpoint", endpoint.path().to_string())];
+ let metric_tags = [
+ KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
+ KeyValue::new("from", format!("{:?}", self.0.our_node_id)),
+ KeyValue::new("to", format!("{:?}", to)),
+ ];
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self
@@ -245,6 +249,7 @@ impl RpcHelper {
)
};
let mut span = tracer.start(span_name);
+ span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id)));
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
span.set_attribute(KeyValue::new("quorum", quorum as i64));
span.set_attribute(KeyValue::new(
diff --git a/src/table/data.rs b/src/table/data.rs
index 4293e395..ff7965f5 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -7,6 +7,7 @@ use tokio::sync::Notify;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::sled_counter::SledCountedTree;
use garage_rpc::system::System;
@@ -27,7 +28,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_tree: sled::Tree,
pub(crate) merkle_todo: sled::Tree,
pub(crate) merkle_todo_notify: Notify,
- pub(crate) gc_todo: sled::Tree,
+ pub(crate) gc_todo: SledCountedTree,
pub(crate) metrics: TableMetrics,
}
@@ -52,6 +53,7 @@ where
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree");
+ let gc_todo = SledCountedTree::new(gc_todo);
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 8d0a5bef..2a05b6ae 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -14,6 +14,7 @@ use tokio::sync::watch;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_rpc::system::System;
@@ -362,7 +363,7 @@ impl GcTodoEntry {
}
/// Saves the GcTodoEntry in the gc_todo tree
- pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
+ pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
@@ -372,7 +373,7 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
- pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
+ pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
&self.todo_table_key()[..],
Some(self.value_hash),
diff --git a/src/table/metrics.rs b/src/table/metrics.rs
index 548bf0d6..752a2a6d 100644
--- a/src/table/metrics.rs
+++ b/src/table/metrics.rs
@@ -1,5 +1,7 @@
use opentelemetry::{global, metrics::*, KeyValue};
+use garage_util::sled_counter::SledCountedTree;
+
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
pub(crate) _merkle_todo_len: ValueObserver<u64>,
@@ -17,7 +19,11 @@ pub struct TableMetrics {
pub(crate) sync_items_received: Counter<u64>,
}
impl TableMetrics {
- pub fn new(table_name: &'static str, merkle_todo: sled::Tree, gc_todo: sled::Tree) -> Self {
+ pub fn new(
+ table_name: &'static str,
+ merkle_todo: sled::Tree,
+ gc_todo: SledCountedTree,
+ ) -> Self {
let meter = global::meter(table_name);
TableMetrics {
_merkle_todo_len: meter
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 7ed00034..e83fc2e6 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -10,6 +10,7 @@ pub mod data;
pub mod error;
pub mod metrics;
pub mod persister;
+pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;
diff --git a/src/util/sled_counter.rs b/src/util/sled_counter.rs
new file mode 100644
index 00000000..8af04f50
--- /dev/null
+++ b/src/util/sled_counter.rs
@@ -0,0 +1,92 @@
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+
+use sled::{CompareAndSwapError, IVec, Iter, Result, Tree};
+
+#[derive(Clone)]
+pub struct SledCountedTree(Arc<SledCountedTreeInternal>);
+
+struct SledCountedTreeInternal {
+ tree: Tree,
+ len: AtomicUsize,
+}
+
+impl SledCountedTree {
+ pub fn new(tree: Tree) -> Self {
+ let len = tree.len();
+ Self(Arc::new(SledCountedTreeInternal {
+ tree,
+ len: AtomicUsize::new(len),
+ }))
+ }
+
+ pub fn len(&self) -> usize {
+ self.0.len.load(Ordering::Relaxed)
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.0.tree.is_empty()
+ }
+
+ pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
+ self.0.tree.get(key)
+ }
+
+ pub fn iter(&self) -> Iter {
+ self.0.tree.iter()
+ }
+
+ // ---- writing functions ----
+
+ pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
+ where
+ K: AsRef<[u8]>,
+ V: Into<IVec>,
+ {
+ let res = self.0.tree.insert(key, value);
+ if res == Ok(None) {
+ self.0.len.fetch_add(1, Ordering::Relaxed);
+ }
+ res
+ }
+
+ pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
+ let res = self.0.tree.pop_min();
+ if let Ok(Some(_)) = &res {
+ self.0.len.fetch_sub(1, Ordering::Relaxed);
+ };
+ res
+ }
+
+ pub fn compare_and_swap<K, OV, NV>(
+ &self,
+ key: K,
+ old: Option<OV>,
+ new: Option<NV>,
+ ) -> Result<std::result::Result<(), CompareAndSwapError>>
+ where
+ K: AsRef<[u8]>,
+ OV: AsRef<[u8]>,
+ NV: Into<IVec>,
+ {
+ let old_some = old.is_some();
+ let new_some = new.is_some();
+
+ let res = self.0.tree.compare_and_swap(key, old, new);
+
+ if res == Ok(Ok(())) {
+ match (old_some, new_some) {
+ (false, true) => {
+ self.0.len.fetch_add(1, Ordering::Relaxed);
+ }
+ (true, false) => {
+ self.0.len.fetch_sub(1, Ordering::Relaxed);
+ }
+ _ => (),
+ }
+ }
+ res
+ }
+}