aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs6
-rw-r--r--src/table/gc.rs24
-rw-r--r--src/table/merkle.rs8
-rw-r--r--src/table/metrics.rs21
4 files changed, 28 insertions, 31 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 7f6b7847..09f4e008 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -6,7 +6,6 @@ use serde_bytes::ByteBuf;
use tokio::sync::Notify;
use garage_db as db;
-use garage_db::counted_tree_hack::CountedTree;
use garage_util::data::*;
use garage_util::error::*;
@@ -36,7 +35,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) insert_queue: db::Tree,
pub(crate) insert_queue_notify: Arc<Notify>,
- pub(crate) gc_todo: CountedTree,
+ pub(crate) gc_todo: db::Tree,
pub(crate) metrics: TableMetrics,
}
@@ -61,7 +60,6 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let gc_todo = db
.open_tree(format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open GC DB tree");
- let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(
F::TABLE_NAME,
@@ -370,6 +368,6 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
}
pub fn gc_todo_len(&self) -> Result<usize, Error> {
- Ok(self.gc_todo.len())
+ Ok(self.gc_todo.len()?)
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index ef788749..d30a1849 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -10,7 +10,7 @@ use serde_bytes::ByteBuf;
use futures::future::join_all;
use tokio::sync::watch;
-use garage_db::counted_tree_hack::CountedTree;
+use garage_db as db;
use garage_util::background::*;
use garage_util::data::*;
@@ -334,9 +334,9 @@ impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
}
}
-/// An entry stored in the gc_todo Sled tree associated with the table
+/// An entry stored in the gc_todo db tree associated with the table
/// Contains helper function for parsing, saving, and removing
-/// such entry in Sled
+/// such entry in the db
///
/// Format of an entry:
/// - key = 8 bytes: timestamp of tombstone
@@ -353,7 +353,7 @@ pub(crate) struct GcTodoEntry {
}
impl GcTodoEntry {
- /// Creates a new GcTodoEntry (not saved in Sled) from its components:
+ /// Creates a new GcTodoEntry (not saved in the db) from its components:
/// the key of an entry in the table, and the hash of the associated
/// serialized value
pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self {
@@ -376,7 +376,7 @@ impl GcTodoEntry {
}
/// Saves the GcTodoEntry in the gc_todo tree
- pub(crate) fn save(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
+ pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
@@ -386,12 +386,14 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
- pub(crate) fn remove_if_equal(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
- gc_todo_tree.compare_and_swap::<_, _, &[u8]>(
- &self.todo_table_key(),
- Some(self.value_hash),
- None,
- )?;
+ pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
+ gc_todo_tree.db().transaction(|txn| {
+ let key = self.todo_table_key();
+ if txn.get(gc_todo_tree, &key)?.as_deref() == Some(self.value_hash.as_slice()) {
+ txn.remove(gc_todo_tree, &key)?;
+ }
+ Ok(())
+ })?;
Ok(())
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 01271c58..596d5805 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -31,14 +31,14 @@ pub struct MerkleUpdater<F: TableSchema, R: TableReplication> {
// - value = the hash of the full serialized item, if present,
// or an empty vec if item is absent (deleted)
// Fields in data:
- // pub(crate) merkle_todo: sled::Tree,
+ // pub(crate) merkle_todo: db::Tree,
// pub(crate) merkle_todo_notify: Notify,
// Content of the merkle tree: items where
// - key = .bytes() for MerkleNodeKey
// - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
// Field in data:
- // pub(crate) merkle_tree: sled::Tree,
+ // pub(crate) merkle_tree: db::Tree,
empty_node_hash: Hash,
}
@@ -291,10 +291,6 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
Ok(self.data.merkle_tree.len()?)
}
- pub fn merkle_tree_fast_len(&self) -> Result<Option<usize>, Error> {
- Ok(self.data.merkle_tree.fast_len()?)
- }
-
pub fn todo_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_todo.len()?)
}
diff --git a/src/table/metrics.rs b/src/table/metrics.rs
index 8318a84f..7bb0959a 100644
--- a/src/table/metrics.rs
+++ b/src/table/metrics.rs
@@ -1,7 +1,6 @@
use opentelemetry::{global, metrics::*, KeyValue};
use garage_db as db;
-use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
@@ -27,7 +26,7 @@ impl TableMetrics {
store: db::Tree,
merkle_tree: db::Tree,
merkle_todo: db::Tree,
- gc_todo: CountedTree,
+ gc_todo: db::Tree,
) -> Self {
let meter = global::meter(table_name);
TableMetrics {
@@ -35,9 +34,9 @@ impl TableMetrics {
.u64_value_observer(
"table.size",
move |observer| {
- if let Ok(Some(v)) = store.fast_len() {
+ if let Ok(value) = store.len() {
observer.observe(
- v as u64,
+ value as u64,
&[KeyValue::new("table_name", table_name)],
);
}
@@ -49,9 +48,9 @@ impl TableMetrics {
.u64_value_observer(
"table.merkle_tree_size",
move |observer| {
- if let Ok(Some(v)) = merkle_tree.fast_len() {
+ if let Ok(value) = merkle_tree.len() {
observer.observe(
- v as u64,
+ value as u64,
&[KeyValue::new("table_name", table_name)],
);
}
@@ -77,10 +76,12 @@ impl TableMetrics {
.u64_value_observer(
"table.gc_todo_queue_length",
move |observer| {
- observer.observe(
- gc_todo.len() as u64,
- &[KeyValue::new("table_name", table_name)],
- );
+ if let Ok(value) = gc_todo.len() {
+ observer.observe(
+ value as u64,
+ &[KeyValue::new("table_name", table_name)],
+ );
+ }
},
)
.with_description("Table garbage collector TODO queue length")