aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-06-08 10:01:44 +0200
committerAlex <alex@adnab.me>2022-06-08 10:01:44 +0200
commitb44d3fc796484a50cd6854f20c9b46e5fddedc9d (patch)
tree29f6da0e8dc68485edf713aaa7331536f4ff4fde /src/table
parent7eed3ceda9cf964e3435f22fc1852e27f4f5a8ae (diff)
downloadgarage-b44d3fc796484a50cd6854f20c9b46e5fddedc9d.tar.gz
garage-b44d3fc796484a50cd6854f20c9b46e5fddedc9d.zip
Abstract database behind generic interface and implement alternative drivers (#322)
- [x] Design interface - [x] Implement Sled backend - [x] Re-implement the SledCountedTree hack ~~on Sled backend~~ on all backends (i.e. over the abstraction) - [x] Convert Garage code to use generic interface - [x] Proof-read converted Garage code - [ ] Test everything well - [x] Implement sqlite backend - [x] Implement LMDB backend - [ ] (Implement Persy backend?) - [ ] (Implement other backends? (like RocksDB, ...)) - [x] Implement backend choice in config file and garage server module - [x] Add CLI for converting between DB formats - Exploit the new interface to put more things in transactions - [x] `.updated()` trigger on Garage tables Fix #284 **Bugs** - [x] When exporting sqlite, trees iterate empty?? - [x] LMDB doesn't work **Known issues for various back-ends** - Sled: - Eats all my RAM and also all my disk space - `.len()` has to traverse the whole table - Is actually quite slow on some operations - And is actually pretty bad code... - Sqlite: - Requires a lock to be taken on all operations. The lock is also taken when iterating on a table with `.iter()`, and the lock isn't released until the iterator is dropped. This means that we must be VERY carefull to not do anything else inside a `.iter()` loop or else we will have a deadlock! Most such cases have been eliminated from the Garage codebase, but there might still be some that remain. If your Garage-over-Sqlite seems to hang/freeze, this is the reason. - (adapter uses a bunch of unsafe code) - Heed (LMDB): - Not suited for 32-bit machines as it has to map the whole DB in memory. - (adpater uses a tiny bit of unsafe code) **My recommendation:** avoid 32-bit machines and use LMDB as much as possible. **Converting databases** is actually quite easy. For example from Sled to LMDB: ```bash cd src/db cargo run --features cli --bin convert -- -i path/to/garage/meta/db -a sled -o path/to/garage/meta/db.lmdb -b lmdb ``` Then, just add this to your `config.toml`: ```toml db_engine = "lmdb" ``` Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/322 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/table')
-rw-r--r--src/table/Cargo.toml3
-rw-r--r--src/table/data.rs113
-rw-r--r--src/table/gc.rs41
-rw-r--r--src/table/merkle.rs101
-rw-r--r--src/table/metrics.rs21
-rw-r--r--src/table/schema.rs19
-rw-r--r--src/table/sync.rs16
-rw-r--r--src/table/table.rs4
8 files changed, 174 insertions, 144 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index ed1a213f..6de37cda 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_util = { version = "0.7.0", path = "../util" }
@@ -25,8 +26,6 @@ hexdump = "0.1"
tracing = "0.1.30"
rand = "0.8"
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/table/data.rs b/src/table/data.rs
index 5cb10066..3212e82b 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -3,12 +3,13 @@ use std::convert::TryInto;
use std::sync::Arc;
use serde_bytes::ByteBuf;
-use sled::{IVec, Transactional};
use tokio::sync::Notify;
+use garage_db as db;
+use garage_db::counted_tree_hack::CountedTree;
+
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::sled_counter::SledCountedTree;
use garage_rpc::system::System;
@@ -25,12 +26,12 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub instance: F,
pub replication: R,
- pub store: sled::Tree,
+ pub store: db::Tree,
- pub(crate) merkle_tree: sled::Tree,
- pub(crate) merkle_todo: sled::Tree,
+ pub(crate) merkle_tree: db::Tree,
+ pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
- pub(crate) gc_todo: SledCountedTree,
+ pub(crate) gc_todo: CountedTree,
pub(crate) metrics: TableMetrics,
}
@@ -40,7 +41,7 @@ where
F: TableSchema,
R: TableReplication,
{
- pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
+ pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
@@ -55,7 +56,7 @@ where
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree");
- let gc_todo = SledCountedTree::new(gc_todo);
+ let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
@@ -98,30 +99,30 @@ where
None => partition_hash.to_vec(),
Some(sk) => self.tree_key(partition_key, sk),
};
- let range = self.store.range(first_key..);
+ let range = self.store.range(first_key..)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
EnumerationOrder::Reverse => match start {
Some(sk) => {
let last_key = self.tree_key(partition_key, sk);
- let range = self.store.range(..=last_key).rev();
+ let range = self.store.range_rev(..=last_key)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
None => {
let mut last_key = partition_hash.to_vec();
let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap());
last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1));
- let range = self.store.range(..last_key).rev();
+ let range = self.store.range_rev(..last_key)?;
self.read_range_aux(partition_hash, range, filter, limit)
}
},
}
}
- fn read_range_aux(
+ fn read_range_aux<'a>(
&self,
partition_hash: Hash,
- range: impl Iterator<Item = sled::Result<(IVec, IVec)>>,
+ range: db::ValueIter<'a>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
@@ -139,7 +140,7 @@ where
}
};
if keep {
- ret.push(Arc::new(ByteBuf::from(value.as_ref())));
+ ret.push(Arc::new(ByteBuf::from(value)));
}
if ret.len() >= limit {
break;
@@ -183,12 +184,10 @@ where
tree_key: &[u8],
f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> {
- let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? {
+ let changed = self.store.db().transaction(|mut tx| {
+ let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
Some(old_bytes) => {
- let old_entry = self
- .decode_entry(&old_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
let new_entry = f(Some(old_entry.clone()));
(Some(old_entry), Some(old_bytes), new_entry)
}
@@ -204,24 +203,28 @@ where
// the associated Merkle tree entry.
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RmpEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ .map_err(db::TxError::Abort)?;
let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
+ drop(old_bytes);
if value_changed || encoding_changed {
let new_bytes_hash = blake2sum(&new_bytes[..]);
- mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?;
- store.insert(tree_key.to_vec(), new_bytes)?;
- Ok(Some((old_entry, new_entry, new_bytes_hash)))
+ tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
+ tx.insert(&self.store, tree_key, new_bytes)?;
+
+ self.instance
+ .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
+
+ Ok(Some((new_entry, new_bytes_hash)))
} else {
Ok(None)
}
})?;
- if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
+ if let Some((new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone();
- self.instance.updated(old_entry.as_ref(), Some(&new_entry));
self.merkle_todo_notify.notify_one();
if is_tombstone {
// We are only responsible for GC'ing this item if we are the
@@ -244,22 +247,23 @@ where
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if cur_v == v {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(true);
+ let removed = self
+ .store
+ .db()
+ .transaction(|mut tx| match tx.get(&self.store, k)? {
+ Some(cur_v) if cur_v == v => {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+
+ let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
+ self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ Ok(true)
}
- }
- Ok(false)
- })?;
+ _ => Ok(false),
+ })?;
if removed {
self.metrics.internal_delete_counter.add(1);
-
- let old_entry = self.decode_entry(v)?;
- self.instance.updated(Some(&old_entry), None);
self.merkle_todo_notify.notify_one();
}
Ok(removed)
@@ -270,25 +274,26 @@ where
k: &[u8],
vhash: Hash,
) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if blake2sum(&cur_v[..]) == vhash {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(Some(cur_v));
+ let removed = self
+ .store
+ .db()
+ .transaction(|mut tx| match tx.get(&self.store, k)? {
+ Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+
+ let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
+ self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ Ok(true)
}
- }
- Ok(None)
- })?;
+ _ => Ok(false),
+ })?;
- if let Some(old_v) = removed {
- let old_entry = self.decode_entry(&old_v[..])?;
- self.instance.updated(Some(&old_entry), None);
+ if removed {
+ self.metrics.internal_delete_counter.add(1);
self.merkle_todo_notify.notify_one();
- Ok(true)
- } else {
- Ok(false)
}
+ Ok(removed)
}
// ---- Utility functions ----
@@ -315,7 +320,7 @@ where
}
}
- pub fn gc_todo_len(&self) -> usize {
- self.gc_todo.len()
+ pub fn gc_todo_len(&self) -> Result<usize, Error> {
+ Ok(self.gc_todo.len())
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 2a05b6ae..e7fbbcb0 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -12,9 +12,10 @@ use futures::select;
use futures_util::future::*;
use tokio::sync::watch;
+use garage_db::counted_tree_hack::CountedTree;
+
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_rpc::system::System;
@@ -100,18 +101,16 @@ where
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
- let mut entries = vec![];
- let mut excluded = vec![];
-
// List entries in the GC todo list
// These entries are put there when a tombstone is inserted in the table
// (see update_entry in data.rs)
- for entry_kv in self.data.gc_todo.iter() {
+ let mut candidates = vec![];
+ for entry_kv in self.data.gc_todo.iter()? {
let (k, vhash) = entry_kv?;
- let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
+ let todo_entry = GcTodoEntry::parse(&k, &vhash);
if todo_entry.deletion_time() > now {
- if entries.is_empty() && excluded.is_empty() {
+ if candidates.is_empty() {
// If the earliest entry in the todo list shouldn't yet be processed,
// return a duration to wait in the loop
return Ok(Some(Duration::from_millis(
@@ -123,15 +122,23 @@ where
}
}
- let vhash = Hash::try_from(&vhash[..]).unwrap();
+ candidates.push(todo_entry);
+ if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE {
+ break;
+ }
+ }
+ let mut entries = vec![];
+ let mut excluded = vec![];
+ for mut todo_entry in candidates {
// Check if the tombstone is still the current value of the entry.
// If not, we don't actually want to GC it, and we will remove it
// from the gc_todo table later (below).
+ let vhash = todo_entry.value_hash;
todo_entry.value = self
.data
.store
- .get(&k[..])?
+ .get(&todo_entry.key[..])?
.filter(|v| blake2sum(&v[..]) == vhash)
.map(|v| v.to_vec());
@@ -353,17 +360,17 @@ impl GcTodoEntry {
}
/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
- pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self {
+ pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self {
Self {
- tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()),
- key: sled_k[8..].to_vec(),
- value_hash: Hash::try_from(sled_v).unwrap(),
+ tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()),
+ key: db_k[8..].to_vec(),
+ value_hash: Hash::try_from(db_v).unwrap(),
value: None,
}
}
/// Saves the GcTodoEntry in the gc_todo tree
- pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
+ pub(crate) fn save(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
@@ -373,9 +380,9 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
- pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
- let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
- &self.todo_table_key()[..],
+ pub(crate) fn remove_if_equal(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
+ gc_todo_tree.compare_and_swap::<_, _, &[u8]>(
+ &self.todo_table_key(),
Some(self.value_hash),
None,
)?;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 93bf7e47..7685b193 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -4,11 +4,10 @@ use std::time::Duration;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
-use sled::transaction::{
- ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
-};
use tokio::sync::watch;
+use garage_db as db;
+
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
@@ -90,35 +89,35 @@ where
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
- if let Some(x) = self.data.merkle_todo.iter().next() {
- match x {
- Ok((key, valhash)) => {
- if let Err(e) = self.update_item(&key[..], &valhash[..]) {
- warn!(
- "({}) Error while updating Merkle tree item: {}",
- F::TABLE_NAME,
- e
- );
- }
- }
- Err(e) => {
- warn!(
- "({}) Error while iterating on Merkle todo tree: {}",
- F::TABLE_NAME,
- e
- );
- tokio::time::sleep(Duration::from_secs(10)).await;
+ match self.updater_loop_iter() {
+ Ok(true) => (),
+ Ok(false) => {
+ select! {
+ _ = self.data.merkle_todo_notify.notified().fuse() => {},
+ _ = must_exit.changed().fuse() => {},
}
}
- } else {
- select! {
- _ = self.data.merkle_todo_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
+ Err(e) => {
+ warn!(
+ "({}) Error while updating Merkle tree item: {}",
+ F::TABLE_NAME,
+ e
+ );
+ tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
}
+ fn updater_loop_iter(&self) -> Result<bool, Error> {
+ if let Some((key, valhash)) = self.data.merkle_todo.first()? {
+ self.update_item(&key, &valhash)?;
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
let khash = blake2sum(k);
@@ -137,13 +136,16 @@ where
};
self.data
.merkle_tree
- .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
+ .db()
+ .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
- let deleted = self
- .data
- .merkle_todo
- .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
- .is_ok();
+ let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
+ let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
+ if remove {
+ tx.remove(&self.data.merkle_todo, k)?;
+ }
+ Ok(remove)
+ })?;
if !deleted {
debug!(
@@ -157,12 +159,12 @@ where
fn update_item_rec(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &[u8],
khash: &Hash,
key: &MerkleNodeKey,
new_vhash: Option<Hash>,
- ) -> ConflictableTransactionResult<Option<Hash>, Error> {
+ ) -> db::TxResult<Option<Hash>, Error> {
let i = key.prefix.len();
// Read node at current position (defined by the prefix stored in key)
@@ -203,7 +205,7 @@ where
}
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
x @ MerkleNode::Leaf(_, _) => {
- tx.remove(key_sub.encode())?;
+ tx.remove(&self.data.merkle_tree, key_sub.encode())?;
Some(x)
}
}
@@ -283,28 +285,27 @@ where
fn read_node_txn(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey,
- ) -> ConflictableTransactionResult<MerkleNode, Error> {
- let ent = tx.get(k.encode())?;
- MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort)
+ ) -> db::TxResult<MerkleNode, Error> {
+ let ent = tx.get(&self.data.merkle_tree, k.encode())?;
+ MerkleNode::decode_opt(&ent).map_err(db::TxError::Abort)
}
fn put_node_txn(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey,
v: &MerkleNode,
- ) -> ConflictableTransactionResult<Hash, Error> {
+ ) -> db::TxResult<Hash, Error> {
trace!("Put Merkle node: {:?} => {:?}", k, v);
if *v == MerkleNode::Empty {
- tx.remove(k.encode())?;
+ tx.remove(&self.data.merkle_tree, k.encode())?;
Ok(self.empty_node_hash)
} else {
- let vby = rmp_to_vec_all_named(v)
- .map_err(|e| ConflictableTransactionError::Abort(e.into()))?;
+ let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]);
- tx.insert(k.encode(), vby)?;
+ tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
Ok(rethash)
}
}
@@ -312,15 +313,15 @@ where
// Access a node in the Merkle tree, used by the sync protocol
pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
let ent = self.data.merkle_tree.get(k.encode())?;
- MerkleNode::decode_opt(ent)
+ MerkleNode::decode_opt(&ent)
}
- pub fn merkle_tree_len(&self) -> usize {
- self.data.merkle_tree.len()
+ pub fn merkle_tree_len(&self) -> Result<usize, Error> {
+ Ok(self.data.merkle_tree.len()?)
}
- pub fn todo_len(&self) -> usize {
- self.data.merkle_todo.len()
+ pub fn todo_len(&self) -> Result<usize, Error> {
+ Ok(self.data.merkle_todo.len()?)
}
}
@@ -347,7 +348,7 @@ impl MerkleNodeKey {
}
impl MerkleNode {
- fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> {
+ fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
match ent {
None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
diff --git a/src/table/metrics.rs b/src/table/metrics.rs
index 752a2a6d..3a1783e0 100644
--- a/src/table/metrics.rs
+++ b/src/table/metrics.rs
@@ -1,6 +1,7 @@
use opentelemetry::{global, metrics::*, KeyValue};
-use garage_util::sled_counter::SledCountedTree;
+use garage_db as db;
+use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
@@ -19,21 +20,19 @@ pub struct TableMetrics {
pub(crate) sync_items_received: Counter<u64>,
}
impl TableMetrics {
- pub fn new(
- table_name: &'static str,
- merkle_todo: sled::Tree,
- gc_todo: SledCountedTree,
- ) -> Self {
+ pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self {
let meter = global::meter(table_name);
TableMetrics {
_merkle_todo_len: meter
.u64_value_observer(
"table.merkle_updater_todo_queue_length",
move |observer| {
- observer.observe(
- merkle_todo.len() as u64,
- &[KeyValue::new("table_name", table_name)],
- )
+ if let Ok(v) = merkle_todo.len() {
+ observer.observe(
+ v as u64,
+ &[KeyValue::new("table_name", table_name)],
+ );
+ }
},
)
.with_description("Merkle tree updater TODO queue length")
@@ -45,7 +44,7 @@ impl TableMetrics {
observer.observe(
gc_todo.len() as u64,
&[KeyValue::new("table_name", table_name)],
- )
+ );
},
)
.with_description("Table garbage collector TODO queue length")
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 37327037..74f57798 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
+use garage_db as db;
use garage_util::data::*;
use crate::crdt::Crdt;
@@ -82,11 +83,19 @@ pub trait TableSchema: Send + Sync {
None
}
- // Updated triggers some stuff downstream, but it is not supposed to block or fail,
- // as the update itself is an unchangeable fact that will never go back
- // due to CRDT logic. Typically errors in propagation of info should be logged
- // to stderr.
- fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {}
+ /// Actions triggered by data changing in a table. If such actions
+ /// include updates to the local database that should be applied
+ /// atomically with the item update itself, a db transaction is
+ /// provided on which these changes should be done.
+ /// This function can return a DB error but that's all.
+ fn updated(
+ &self,
+ _tx: &mut db::Transaction,
+ _old: Option<&Self::E>,
+ _new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ Ok(())
+ }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 08069ad0..4c83e991 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -258,9 +258,9 @@ where
while !*must_exit.borrow() {
let mut items = Vec::new();
- for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
+ for item in self.data.store.range(begin.to_vec()..end.to_vec())? {
let (key, value) = item?;
- items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
+ items.push((key.to_vec(), Arc::new(ByteBuf::from(value))));
if items.len() >= 1024 {
break;
@@ -603,8 +603,16 @@ impl SyncTodo {
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
- if data.store.range(begin..end).next().is_none() {
- continue;
+ match data.store.range(begin..end) {
+ Ok(mut iter) => {
+ if iter.next().is_none() {
+ continue;
+ }
+ }
+ Err(e) => {
+ warn!("DB error in add_full_sync: {}", e);
+ continue;
+ }
}
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 2a167604..3c211728 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -13,6 +13,8 @@ use opentelemetry::{
Context,
};
+use garage_db as db;
+
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -69,7 +71,7 @@ where
{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
- pub fn new(instance: F, replication: R, system: Arc<System>, db: &sled::Db) -> Arc<Self> {
+ pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));