aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/Cargo.toml10
-rw-r--r--src/table/data.rs187
-rw-r--r--src/table/gc.rs139
-rw-r--r--src/table/merkle.rs150
-rw-r--r--src/table/metrics.rs21
-rw-r--r--src/table/schema.rs21
-rw-r--r--src/table/sync.rs233
-rw-r--r--src/table/table.rs143
-rw-r--r--src/table/util.rs18
9 files changed, 551 insertions, 371 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index ed1a213f..38c6b41c 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_table"
-version = "0.7.0"
+version = "0.8.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,19 +14,19 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_rpc = { version = "0.7.0", path = "../rpc" }
-garage_util = { version = "0.7.0", path = "../util" }
+garage_db = { version = "0.8.0", path = "../db" }
+garage_rpc = { version = "0.8.0", path = "../rpc" }
+garage_util = { version = "0.8.0", path = "../util" }
opentelemetry = "0.17"
async-trait = "0.1.7"
bytes = "1.0"
+hex = "0.4"
hexdump = "0.1"
tracing = "0.1.30"
rand = "0.8"
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/table/data.rs b/src/table/data.rs
index ff7965f5..3212e82b 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -1,13 +1,15 @@
use core::borrow::Borrow;
+use std::convert::TryInto;
use std::sync::Arc;
use serde_bytes::ByteBuf;
-use sled::Transactional;
use tokio::sync::Notify;
+use garage_db as db;
+use garage_db::counted_tree_hack::CountedTree;
+
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::sled_counter::SledCountedTree;
use garage_rpc::system::System;
@@ -16,19 +18,20 @@ use crate::gc::GcTodoEntry;
use crate::metrics::*;
use crate::replication::*;
use crate::schema::*;
+use crate::util::*;
pub struct TableData<F: TableSchema, R: TableReplication> {
system: Arc<System>,
- pub(crate) instance: F,
- pub(crate) replication: R,
+ pub instance: F,
+ pub replication: R,
- pub store: sled::Tree,
+ pub store: db::Tree,
- pub(crate) merkle_tree: sled::Tree,
- pub(crate) merkle_todo: sled::Tree,
+ pub(crate) merkle_tree: db::Tree,
+ pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
- pub(crate) gc_todo: SledCountedTree,
+ pub(crate) gc_todo: CountedTree,
pub(crate) metrics: TableMetrics,
}
@@ -38,7 +41,7 @@ where
F: TableSchema,
R: TableReplication,
{
- pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
+ pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
@@ -53,7 +56,7 @@ where
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree");
- let gc_todo = SledCountedTree::new(gc_todo);
+ let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
@@ -83,18 +86,48 @@ where
pub fn read_range(
&self,
- p: &F::P,
- s: &Option<F::S>,
+ partition_key: &F::P,
+ start: &Option<F::S>,
+ filter: &Option<F::Filter>,
+ limit: usize,
+ enumeration_order: EnumerationOrder,
+ ) -> Result<Vec<Arc<ByteBuf>>, Error> {
+ let partition_hash = partition_key.hash();
+ match enumeration_order {
+ EnumerationOrder::Forward => {
+ let first_key = match start {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.tree_key(partition_key, sk),
+ };
+ let range = self.store.range(first_key..)?;
+ self.read_range_aux(partition_hash, range, filter, limit)
+ }
+ EnumerationOrder::Reverse => match start {
+ Some(sk) => {
+ let last_key = self.tree_key(partition_key, sk);
+ let range = self.store.range_rev(..=last_key)?;
+ self.read_range_aux(partition_hash, range, filter, limit)
+ }
+ None => {
+ let mut last_key = partition_hash.to_vec();
+ let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap());
+ last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1));
+ let range = self.store.range_rev(..last_key)?;
+ self.read_range_aux(partition_hash, range, filter, limit)
+ }
+ },
+ }
+ }
+
+ fn read_range_aux<'a>(
+ &self,
+ partition_hash: Hash,
+ range: db::ValueIter<'a>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
- let partition_hash = p.hash();
- let first_key = match s {
- None => partition_hash.to_vec(),
- Some(sk) => self.tree_key(p, sk),
- };
let mut ret = vec![];
- for item in self.store.range(first_key..) {
+ for item in range {
let (key, value) = item?;
if &key[..32] != partition_hash.as_slice() {
break;
@@ -107,7 +140,7 @@ where
}
};
if keep {
- ret.push(Arc::new(ByteBuf::from(value.as_ref())));
+ ret.push(Arc::new(ByteBuf::from(value)));
}
if ret.len() >= limit {
break;
@@ -136,17 +169,29 @@ where
let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
- let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- let (old_entry, old_bytes, new_entry) = match store.get(&tree_key)? {
+ self.update_entry_with(&tree_key[..], |ent| match ent {
+ Some(mut ent) => {
+ ent.merge(&update);
+ ent
+ }
+ None => update.clone(),
+ })?;
+ Ok(())
+ }
+
+ pub fn update_entry_with(
+ &self,
+ tree_key: &[u8],
+ f: impl Fn(Option<F::E>) -> F::E,
+ ) -> Result<Option<F::E>, Error> {
+ let changed = self.store.db().transaction(|mut tx| {
+ let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
Some(old_bytes) => {
- let old_entry = self
- .decode_entry(&old_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let mut new_entry = old_entry.clone();
- new_entry.merge(&update);
+ let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
+ let new_entry = f(Some(old_entry.clone()));
(Some(old_entry), Some(old_bytes), new_entry)
}
- None => (None, None, update.clone()),
+ None => (None, None, f(None)),
};
// Scenario 1: the value changed, so of course there is a change
@@ -158,24 +203,28 @@ where
// the associated Merkle tree entry.
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RmpEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ .map_err(db::TxError::Abort)?;
let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
+ drop(old_bytes);
if value_changed || encoding_changed {
let new_bytes_hash = blake2sum(&new_bytes[..]);
- mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
- store.insert(tree_key.clone(), new_bytes)?;
- Ok(Some((old_entry, new_entry, new_bytes_hash)))
+ tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
+ tx.insert(&self.store, tree_key, new_bytes)?;
+
+ self.instance
+ .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
+
+ Ok(Some((new_entry, new_bytes_hash)))
} else {
Ok(None)
}
})?;
- if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
+ if let Some((new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone();
- self.instance.updated(old_entry, Some(new_entry));
self.merkle_todo_notify.notify_one();
if is_tombstone {
// We are only responsible for GC'ing this item if we are the
@@ -187,31 +236,34 @@ where
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
let nodes = self.replication.write_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
- GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
+ GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?;
}
}
- }
- Ok(())
+ Ok(Some(new_entry))
+ } else {
+ Ok(None)
+ }
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if cur_v == v {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(true);
+ let removed = self
+ .store
+ .db()
+ .transaction(|mut tx| match tx.get(&self.store, k)? {
+ Some(cur_v) if cur_v == v => {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+
+ let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
+ self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ Ok(true)
}
- }
- Ok(false)
- })?;
+ _ => Ok(false),
+ })?;
if removed {
self.metrics.internal_delete_counter.add(1);
-
- let old_entry = self.decode_entry(v)?;
- self.instance.updated(Some(old_entry), None);
self.merkle_todo_notify.notify_one();
}
Ok(removed)
@@ -222,36 +274,37 @@ where
k: &[u8],
vhash: Hash,
) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if blake2sum(&cur_v[..]) == vhash {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(Some(cur_v));
+ let removed = self
+ .store
+ .db()
+ .transaction(|mut tx| match tx.get(&self.store, k)? {
+ Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
+ tx.remove(&self.store, k)?;
+ tx.insert(&self.merkle_todo, k, vec![])?;
+
+ let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
+ self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ Ok(true)
}
- }
- Ok(None)
- })?;
+ _ => Ok(false),
+ })?;
- if let Some(old_v) = removed {
- let old_entry = self.decode_entry(&old_v[..])?;
- self.instance.updated(Some(old_entry), None);
+ if removed {
+ self.metrics.internal_delete_counter.add(1);
self.merkle_todo_notify.notify_one();
- Ok(true)
- } else {
- Ok(false)
}
+ Ok(removed)
}
// ---- Utility functions ----
- pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
+ pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
ret.extend(s.sort_key());
ret
}
- pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
+ pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
Ok(x) => Ok(x),
Err(e) => match F::try_migrate(bytes) {
@@ -267,7 +320,7 @@ where
}
}
- pub fn gc_todo_len(&self) -> usize {
- self.gc_todo.len()
+ pub fn gc_todo_len(&self) -> Result<usize, Error> {
+ Ok(self.gc_todo.len())
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 2a05b6ae..83e7eeff 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -8,13 +8,13 @@ use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use futures::future::join_all;
-use futures::select;
-use futures_util::future::*;
use tokio::sync::watch;
+use garage_db::counted_tree_hack::CountedTree;
+
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_rpc::system::System;
@@ -25,7 +25,6 @@ use crate::replication::*;
use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
-const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
// GC delay for table entries: 1 day (24 hours)
// (the delay before the entry is added in the GC todo list
@@ -68,50 +67,24 @@ where
gc.endpoint.set_handler(gc.clone());
- let gc1 = gc.clone();
- system.background.spawn_worker(
- format!("GC loop for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
- );
+ system.background.spawn_worker(GcWorker::new(gc.clone()));
gc
}
- async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- match self.gc_loop_iter().await {
- Ok(None) => {
- // Stuff was done, loop immediately
- }
- Ok(Some(wait_delay)) => {
- // Nothing was done, wait specified delay.
- select! {
- _ = tokio::time::sleep(wait_delay).fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
- Err(e) => {
- warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
- }
- }
- }
- }
-
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
- let mut entries = vec![];
- let mut excluded = vec![];
-
// List entries in the GC todo list
// These entries are put there when a tombstone is inserted in the table
// (see update_entry in data.rs)
- for entry_kv in self.data.gc_todo.iter() {
+ let mut candidates = vec![];
+ for entry_kv in self.data.gc_todo.iter()? {
let (k, vhash) = entry_kv?;
- let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
+ let todo_entry = GcTodoEntry::parse(&k, &vhash);
if todo_entry.deletion_time() > now {
- if entries.is_empty() && excluded.is_empty() {
+ if candidates.is_empty() {
// If the earliest entry in the todo list shouldn't yet be processed,
// return a duration to wait in the loop
return Ok(Some(Duration::from_millis(
@@ -123,15 +96,23 @@ where
}
}
- let vhash = Hash::try_from(&vhash[..]).unwrap();
+ candidates.push(todo_entry);
+ if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE {
+ break;
+ }
+ }
+ let mut entries = vec![];
+ let mut excluded = vec![];
+ for mut todo_entry in candidates {
// Check if the tombstone is still the current value of the entry.
// If not, we don't actually want to GC it, and we will remove it
// from the gc_todo table later (below).
+ let vhash = todo_entry.value_hash;
todo_entry.value = self
.data
.store
- .get(&k[..])?
+ .get(&todo_entry.key[..])?
.filter(|v| blake2sum(&v[..]) == vhash)
.map(|v| v.to_vec());
@@ -254,9 +235,7 @@ where
&self.endpoint,
&nodes[..],
GcRpc::Update(updates),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: send tombstones")?;
@@ -277,9 +256,7 @@ where
&self.endpoint,
&nodes[..],
GcRpc::DeleteIfEqualHash(deletes),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: remote delete tombstones")?;
@@ -321,6 +298,66 @@ where
}
}
+struct GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ gc: Arc<TableGc<F, R>>,
+ wait_delay: Duration,
+}
+
+impl<F, R> GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn new(gc: Arc<TableGc<F, R>>) -> Self {
+ Self {
+ gc,
+ wait_delay: Duration::from_secs(0),
+ }
+ }
+}
+
+#[async_trait]
+impl<F, R> Worker for GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("{} GC", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.gc.data.gc_todo_len().unwrap_or(0);
+ if l > 0 {
+ Some(format!("{} items in queue", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match self.gc.gc_loop_iter().await? {
+ None => Ok(WorkerState::Busy),
+ Some(delay) => {
+ self.wait_delay = delay;
+ Ok(WorkerState::Idle)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ tokio::time::sleep(self.wait_delay).await;
+ WorkerState::Busy
+ }
+}
+
/// An entry stored in the gc_todo Sled tree associated with the table
/// Contains helper function for parsing, saving, and removing
/// such entry in Sled
@@ -353,17 +390,17 @@ impl GcTodoEntry {
}
/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
- pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self {
+ pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self {
Self {
- tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()),
- key: sled_k[8..].to_vec(),
- value_hash: Hash::try_from(sled_v).unwrap(),
+ tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()),
+ key: db_k[8..].to_vec(),
+ value_hash: Hash::try_from(db_v).unwrap(),
value: None,
}
}
/// Saves the GcTodoEntry in the gc_todo tree
- pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
+ pub(crate) fn save(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
@@ -373,9 +410,9 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
- pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
- let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
- &self.todo_table_key()[..],
+ pub(crate) fn remove_if_equal(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
+ gc_todo_tree.compare_and_swap::<_, _, &[u8]>(
+ &self.todo_table_key(),
Some(self.value_hash),
None,
)?;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 93bf7e47..a5c29723 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -1,15 +1,13 @@
use std::sync::Arc;
use std::time::Duration;
-use futures::select;
-use futures_util::future::*;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use sled::transaction::{
- ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
-};
use tokio::sync::watch;
-use garage_util::background::BackgroundRunner;
+use garage_db as db;
+
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -79,43 +77,17 @@ where
empty_node_hash,
});
- let ret2 = ret.clone();
- background.spawn_worker(
- format!("Merkle tree updater for {}", F::TABLE_NAME),
- |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
- );
+ background.spawn_worker(MerkleWorker(ret.clone()));
ret
}
- async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- if let Some(x) = self.data.merkle_todo.iter().next() {
- match x {
- Ok((key, valhash)) => {
- if let Err(e) = self.update_item(&key[..], &valhash[..]) {
- warn!(
- "({}) Error while updating Merkle tree item: {}",
- F::TABLE_NAME,
- e
- );
- }
- }
- Err(e) => {
- warn!(
- "({}) Error while iterating on Merkle todo tree: {}",
- F::TABLE_NAME,
- e
- );
- tokio::time::sleep(Duration::from_secs(10)).await;
- }
- }
- } else {
- select! {
- _ = self.data.merkle_todo_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
+ fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
+ if let Some((key, valhash)) = self.data.merkle_todo.first()? {
+ self.update_item(&key, &valhash)?;
+ Ok(WorkerState::Busy)
+ } else {
+ Ok(WorkerState::Idle)
}
}
@@ -137,13 +109,16 @@ where
};
self.data
.merkle_tree
- .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
+ .db()
+ .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
- let deleted = self
- .data
- .merkle_todo
- .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
- .is_ok();
+ let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
+ let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
+ if remove {
+ tx.remove(&self.data.merkle_todo, k)?;
+ }
+ Ok(remove)
+ })?;
if !deleted {
debug!(
@@ -157,12 +132,12 @@ where
fn update_item_rec(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &[u8],
khash: &Hash,
key: &MerkleNodeKey,
new_vhash: Option<Hash>,
- ) -> ConflictableTransactionResult<Option<Hash>, Error> {
+ ) -> db::TxResult<Option<Hash>, Error> {
let i = key.prefix.len();
// Read node at current position (defined by the prefix stored in key)
@@ -203,7 +178,7 @@ where
}
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
x @ MerkleNode::Leaf(_, _) => {
- tx.remove(key_sub.encode())?;
+ tx.remove(&self.data.merkle_tree, key_sub.encode())?;
Some(x)
}
}
@@ -283,28 +258,27 @@ where
fn read_node_txn(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey,
- ) -> ConflictableTransactionResult<MerkleNode, Error> {
- let ent = tx.get(k.encode())?;
- MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort)
+ ) -> db::TxResult<MerkleNode, Error> {
+ let ent = tx.get(&self.data.merkle_tree, k.encode())?;
+ MerkleNode::decode_opt(&ent).map_err(db::TxError::Abort)
}
fn put_node_txn(
&self,
- tx: &TransactionalTree,
+ tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey,
v: &MerkleNode,
- ) -> ConflictableTransactionResult<Hash, Error> {
+ ) -> db::TxResult<Hash, Error> {
trace!("Put Merkle node: {:?} => {:?}", k, v);
if *v == MerkleNode::Empty {
- tx.remove(k.encode())?;
+ tx.remove(&self.data.merkle_tree, k.encode())?;
Ok(self.empty_node_hash)
} else {
- let vby = rmp_to_vec_all_named(v)
- .map_err(|e| ConflictableTransactionError::Abort(e.into()))?;
+ let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]);
- tx.insert(k.encode(), vby)?;
+ tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
Ok(rethash)
}
}
@@ -312,15 +286,63 @@ where
// Access a node in the Merkle tree, used by the sync protocol
pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
let ent = self.data.merkle_tree.get(k.encode())?;
- MerkleNode::decode_opt(ent)
+ MerkleNode::decode_opt(&ent)
+ }
+
+ pub fn merkle_tree_len(&self) -> Result<usize, Error> {
+ Ok(self.data.merkle_tree.len()?)
}
- pub fn merkle_tree_len(&self) -> usize {
- self.data.merkle_tree.len()
+ pub fn todo_len(&self) -> Result<usize, Error> {
+ Ok(self.data.merkle_todo.len()?)
}
+}
+
+struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static;
- pub fn todo_len(&self) -> usize {
- self.data.merkle_todo.len()
+#[async_trait]
+impl<F, R> Worker for MerkleWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("{} Merkle tree updater", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.0.todo_len().unwrap_or(0);
+ if l > 0 {
+ Some(format!("{} items in queue", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let updater = self.0.clone();
+ tokio::task::spawn_blocking(move || {
+ for _i in 0..100 {
+ let s = updater.updater_loop_iter();
+ if !matches!(s, Ok(WorkerState::Busy)) {
+ return s;
+ }
+ }
+ Ok(WorkerState::Busy)
+ })
+ .await
+ .unwrap()
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ WorkerState::Busy
}
}
@@ -347,7 +369,7 @@ impl MerkleNodeKey {
}
impl MerkleNode {
- fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> {
+ fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
match ent {
None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
diff --git a/src/table/metrics.rs b/src/table/metrics.rs
index 752a2a6d..3a1783e0 100644
--- a/src/table/metrics.rs
+++ b/src/table/metrics.rs
@@ -1,6 +1,7 @@
use opentelemetry::{global, metrics::*, KeyValue};
-use garage_util::sled_counter::SledCountedTree;
+use garage_db as db;
+use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
@@ -19,21 +20,19 @@ pub struct TableMetrics {
pub(crate) sync_items_received: Counter<u64>,
}
impl TableMetrics {
- pub fn new(
- table_name: &'static str,
- merkle_todo: sled::Tree,
- gc_todo: SledCountedTree,
- ) -> Self {
+ pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self {
let meter = global::meter(table_name);
TableMetrics {
_merkle_todo_len: meter
.u64_value_observer(
"table.merkle_updater_todo_queue_length",
move |observer| {
- observer.observe(
- merkle_todo.len() as u64,
- &[KeyValue::new("table_name", table_name)],
- )
+ if let Ok(v) = merkle_todo.len() {
+ observer.observe(
+ v as u64,
+ &[KeyValue::new("table_name", table_name)],
+ );
+ }
},
)
.with_description("Merkle tree updater TODO queue length")
@@ -45,7 +44,7 @@ impl TableMetrics {
observer.observe(
gc_todo.len() as u64,
&[KeyValue::new("table_name", table_name)],
- )
+ );
},
)
.with_description("Table garbage collector TODO queue length")
diff --git a/src/table/schema.rs b/src/table/schema.rs
index eba918a2..f37e98d8 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
+use garage_db as db;
use garage_util::data::*;
use crate::crdt::Crdt;
@@ -59,7 +60,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
}
/// Trait for the schema used in a table
-pub trait TableSchema: Send + Sync {
+pub trait TableSchema: Send + Sync + 'static {
/// The name of the table in the database
const TABLE_NAME: &'static str;
@@ -82,11 +83,19 @@ pub trait TableSchema: Send + Sync {
None
}
- // Updated triggers some stuff downstream, but it is not supposed to block or fail,
- // as the update itself is an unchangeable fact that will never go back
- // due to CRDT logic. Typically errors in propagation of info should be logged
- // to stderr.
- fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
+ /// Actions triggered by data changing in a table. If such actions
+ /// include updates to the local database that should be applied
+ /// atomically with the item update itself, a db transaction is
+ /// provided on which these changes should be done.
+ /// This function can return a DB error but that's all.
+ fn updated(
+ &self,
+ _tx: &mut db::Transaction,
+ _old: Option<&Self::E>,
+ _new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ Ok(())
+ }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 08069ad0..9d79d856 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -1,17 +1,17 @@
use std::collections::VecDeque;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
-use futures::select;
-use futures_util::future::*;
use futures_util::stream::*;
use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
+use tokio::select;
use tokio::sync::{mpsc, watch};
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -24,8 +24,6 @@ use crate::merkle::*;
use crate::replication::*;
use crate::*;
-const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
@@ -34,7 +32,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- todo: Mutex<SyncTodo>,
+ add_full_sync_tx: mpsc::UnboundedSender<()>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@@ -52,10 +50,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
-struct SyncTodo {
- todo: Vec<TodoPartition>,
-}
-
#[derive(Debug, Clone)]
struct TodoPartition {
partition: Partition,
@@ -80,118 +74,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
- let todo = SyncTodo { todo: vec![] };
+ let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
let syncer = Arc::new(Self {
system: system.clone(),
data,
merkle,
- todo: Mutex::new(todo),
+ add_full_sync_tx,
endpoint,
});
syncer.endpoint.set_handler(syncer.clone());
- let (busy_tx, busy_rx) = mpsc::unbounded_channel();
-
- let s1 = syncer.clone();
- system.background.spawn_worker(
- format!("table sync watcher for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
- );
-
- let s2 = syncer.clone();
- system.background.spawn_worker(
- format!("table syncer for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
- );
-
- let s3 = syncer.clone();
- tokio::spawn(async move {
- tokio::time::sleep(Duration::from_secs(20)).await;
- s3.add_full_sync();
+ system.background.spawn_worker(SyncWorker {
+ syncer: syncer.clone(),
+ ring_recv: system.ring.clone(),
+ ring: system.ring.borrow().clone(),
+ add_full_sync_rx,
+ todo: vec![],
+ next_full_sync: Instant::now() + Duration::from_secs(20),
});
syncer
}
- async fn watcher_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) {
- let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
- let mut nothing_to_do_since = Some(Instant::now());
-
- while !*must_exit.borrow() {
- select! {
- _ = ring_recv.changed().fuse() => {
- let new_ring = ring_recv.borrow();
- if !Arc::ptr_eq(&new_ring, &prev_ring) {
- debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- prev_ring = new_ring.clone();
- }
- }
- busy_opt = busy_rx.recv().fuse() => {
- if let Some(busy) = busy_opt {
- if busy {
- nothing_to_do_since = None;
- } else if nothing_to_do_since.is_none() {
- nothing_to_do_since = Some(Instant::now());
- }
- }
- }
- _ = must_exit.changed().fuse() => {},
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
- if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
- nothing_to_do_since = None;
- debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- }
- }
- }
- }
- }
-
pub fn add_full_sync(&self) {
- self.todo
- .lock()
- .unwrap()
- .add_full_sync(&self.data, &self.system);
- }
-
- async fn syncer_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- busy_tx: mpsc::UnboundedSender<bool>,
- ) {
- while !*must_exit.borrow() {
- let task = self.todo.lock().unwrap().pop_task();
- if let Some(partition) = task {
- busy_tx.send(true).unwrap();
- let res = self
- .clone()
- .sync_partition(&partition, &mut must_exit)
- .await;
- if let Err(e) = res {
- warn!(
- "({}) Error while syncing {:?}: {}",
- F::TABLE_NAME,
- partition,
- e
- );
- }
- } else {
- busy_tx.send(false).unwrap();
- tokio::time::sleep(Duration::from_secs(1)).await;
- }
+ if self.add_full_sync_tx.send(()).is_err() {
+ error!("({}) Could not add full sync", F::TABLE_NAME);
}
}
+ // ----
+
async fn sync_partition(
- self: Arc<Self>,
+ self: &Arc<Self>,
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
@@ -258,9 +174,9 @@ where
while !*must_exit.borrow() {
let mut items = Vec::new();
- for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
+ for item in self.data.store.range(begin.to_vec()..end.to_vec())? {
let (key, value) = item?;
- items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
+ items.push((key.to_vec(), Arc::new(ByteBuf::from(value))));
if items.len() >= 1024 {
break;
@@ -329,9 +245,7 @@ where
&self.endpoint,
nodes,
SyncRpc::Items(values),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await?;
@@ -392,8 +306,7 @@ where
&self.endpoint,
who,
SyncRpc::RootCkHash(partition.partition, root_ck_hash),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
@@ -432,11 +345,11 @@ where
// Just send that item directly
if let Some(val) = self.data.store.get(&ik[..])? {
if blake2sum(&val[..]) != ivhash {
- warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik);
+ debug!("({}) Hashes differ between stored value and Merkle tree, key: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik));
}
todo_items.push(val.to_vec());
} else {
- warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik);
+ debug!("({}) Item from Merkle tree not found in store: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik));
}
}
MerkleNode::Intermediate(l) => {
@@ -449,8 +362,7 @@ where
&self.endpoint,
who,
SyncRpc::GetNode(key.clone()),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?
{
@@ -526,8 +438,7 @@ where
&self.endpoint,
who,
SyncRpc::Items(values),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
if let SyncRpc::Ok = rpc_resp {
@@ -577,12 +488,22 @@ where
}
}
-impl SyncTodo {
- fn add_full_sync<F: TableSchema, R: TableReplication>(
- &mut self,
- data: &TableData<F, R>,
- system: &System,
- ) {
+// -------- Sync Worker ---------
+
+struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+ syncer: Arc<TableSyncer<F, R>>,
+ ring_recv: watch::Receiver<Arc<Ring>>,
+ ring: Arc<Ring>,
+ add_full_sync_rx: mpsc::UnboundedReceiver<()>,
+ todo: Vec<TodoPartition>,
+ next_full_sync: Instant,
+}
+
+impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+ fn add_full_sync(&mut self) {
+ let system = &self.syncer.system;
+ let data = &self.syncer.data;
+
let my_id = system.id;
self.todo.clear();
@@ -603,8 +524,16 @@ impl SyncTodo {
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
- if data.store.range(begin..end).next().is_none() {
- continue;
+ match data.store.range(begin..end) {
+ Ok(mut iter) => {
+ if iter.next().is_none() {
+ continue;
+ }
+ }
+ Err(e) => {
+ warn!("DB error in add_full_sync: {}", e);
+ continue;
+ }
}
}
@@ -615,6 +544,8 @@ impl SyncTodo {
retain,
});
}
+
+ self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
fn pop_task(&mut self) -> Option<TodoPartition> {
@@ -633,6 +564,62 @@ impl SyncTodo {
}
}
+#[async_trait]
+impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+ fn name(&self) -> String {
+ format!("{} sync", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.todo.len();
+ if l > 0 {
+ Some(format!("{} partitions remaining", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ if let Some(partition) = self.pop_task() {
+ self.syncer.sync_partition(&partition, must_exit).await?;
+ Ok(WorkerState::Busy)
+ } else {
+ Ok(WorkerState::Idle)
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ select! {
+ s = self.add_full_sync_rx.recv() => {
+ if let Some(()) = s {
+ self.add_full_sync();
+ }
+ },
+ _ = self.ring_recv.changed() => {
+ let new_ring = self.ring_recv.borrow();
+ if !Arc::ptr_eq(&new_ring, &self.ring) {
+ self.ring = new_ring.clone();
+ drop(new_ring);
+ debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
+ self.add_full_sync();
+ }
+ },
+ _ = tokio::time::sleep_until(self.next_full_sync.into()) => {
+ self.add_full_sync();
+ }
+ }
+ match self.todo.is_empty() {
+ false => WorkerState::Busy,
+ true => WorkerState::Idle,
+ }
+ }
+}
+
+// ---- UTIL ----
+
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 7f87a449..8a66c420 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -1,6 +1,6 @@
-use std::collections::{BTreeMap, HashMap};
+use std::borrow::Borrow;
+use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
-use std::time::Duration;
use async_trait::async_trait;
use futures::stream::*;
@@ -12,6 +12,8 @@ use opentelemetry::{
Context,
};
+use garage_db as db;
+
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -26,8 +28,7 @@ use crate::merkle::*;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
-
-const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
+use crate::util::*;
pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub system: Arc<System>,
@@ -45,7 +46,13 @@ pub(crate) enum TableRpc<F: TableSchema> {
ReadEntryResponse(Option<ByteBuf>),
// Read range: read all keys in partition P, possibly starting at a certain sort key offset
- ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize),
+ ReadRange {
+ partition: F::P,
+ begin_sort_key: Option<F::S>,
+ filter: Option<F::Filter>,
+ limit: usize,
+ enumeration_order: EnumerationOrder,
+ },
Update(Vec<Arc<ByteBuf>>),
}
@@ -61,7 +68,7 @@ where
{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
- pub fn new(instance: F, replication: R, system: Arc<System>, db: &sled::Db) -> Arc<Self> {
+ pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));
@@ -103,7 +110,6 @@ where
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
- //eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
@@ -115,17 +121,20 @@ where
&who[..],
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.data.replication.write_quorum())
- .with_timeout(TABLE_RPC_TIMEOUT),
+ .with_quorum(self.data.replication.write_quorum()),
)
.await?;
Ok(())
}
- pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
+ pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
+ where
+ I: IntoIterator<Item = IE> + Send + Sync,
+ IE: Borrow<F::E> + Send + Sync,
+ {
let tracer = opentelemetry::global::tracer("garage_table");
- let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len()));
+ let span = tracer.start(format!("{} insert_many", F::TABLE_NAME));
self.insert_many_internal(entries)
.bound_record_duration(&self.data.metrics.put_request_duration)
@@ -137,10 +146,15 @@ where
Ok(())
}
- async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> {
+ async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error>
+ where
+ I: IntoIterator<Item = IE> + Send + Sync,
+ IE: Borrow<F::E> + Send + Sync,
+ {
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
- for entry in entries.iter() {
+ for entry in entries.into_iter() {
+ let entry = entry.borrow();
let hash = entry.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
@@ -159,7 +173,7 @@ where
&self.endpoint,
node,
rpc,
- RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_NORMAL),
)
.await?;
Ok::<_, Error>((node, resp))
@@ -216,7 +230,6 @@ where
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum())
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -261,12 +274,19 @@ where
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
limit: usize,
+ enumeration_order: EnumerationOrder,
) -> Result<Vec<F::E>, Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} get_range", F::TABLE_NAME));
let res = self
- .get_range_internal(partition_key, begin_sort_key, filter, limit)
+ .get_range_internal(
+ partition_key,
+ begin_sort_key,
+ filter,
+ limit,
+ enumeration_order,
+ )
.bound_record_duration(&self.data.metrics.get_request_duration)
.with_context(Context::current_with_span(span))
.await?;
@@ -282,11 +302,18 @@ where
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
limit: usize,
+ enumeration_order: EnumerationOrder,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
- let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
+ let rpc = TableRpc::<F>::ReadRange {
+ partition: partition_key.clone(),
+ begin_sort_key,
+ filter,
+ limit,
+ enumeration_order,
+ };
let resps = self
.system
@@ -297,49 +324,69 @@ where
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum())
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
- let mut ret = BTreeMap::new();
- let mut to_repair = BTreeMap::new();
+ let mut ret: BTreeMap<Vec<u8>, F::E> = BTreeMap::new();
+ let mut to_repair = BTreeSet::new();
for resp in resps {
if let TableRpc::Update(entries) = resp {
for entry_bytes in entries.iter() {
let entry = self.data.decode_entry(entry_bytes.as_slice())?;
let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key());
- match ret.remove(&entry_key) {
- None => {
- ret.insert(entry_key, Some(entry));
- }
- Some(Some(mut prev)) => {
- let must_repair = prev != entry;
- prev.merge(&entry);
- if must_repair {
- to_repair.insert(entry_key.clone(), Some(prev.clone()));
+ match ret.get_mut(&entry_key) {
+ Some(e) => {
+ if *e != entry {
+ e.merge(&entry);
+ to_repair.insert(entry_key.clone());
}
- ret.insert(entry_key, Some(prev));
}
- Some(None) => unreachable!(),
+ None => {
+ ret.insert(entry_key, entry);
+ }
}
}
+ } else {
+ return Err(Error::unexpected_rpc_message(resp));
}
}
+
if !to_repair.is_empty() {
let self2 = self.clone();
+ let to_repair = to_repair
+ .into_iter()
+ .map(|k| ret.get(&k).unwrap().clone())
+ .collect::<Vec<_>>();
self.system.background.spawn_cancellable(async move {
- for (_, v) in to_repair.iter_mut() {
- self2.repair_on_read(&who[..], v.take().unwrap()).await?;
+ for v in to_repair {
+ self2.repair_on_read(&who[..], v).await?;
}
Ok(())
});
}
- let ret_vec = ret
- .iter_mut()
- .take(limit)
- .map(|(_k, v)| v.take().unwrap())
- .collect::<Vec<_>>();
+
+ // At this point, the `ret` btreemap might contain more than `limit`
+ // items, because nodes might have returned us each `limit` items
+ // but for different keys. We have to take only the first `limit` items
+ // in this map, in the specified enumeration order, for two reasons:
+ // 1. To return to the user no more than the number of items that they requested
+ // 2. To return only items for which we have a read quorum: we do not know
+ // that we have a read quorum for the items after the first `limit`
+ // of them
+ let ret_vec = match enumeration_order {
+ EnumerationOrder::Forward => ret
+ .into_iter()
+ .take(limit)
+ .map(|(_k, v)| v)
+ .collect::<Vec<_>>(),
+ EnumerationOrder::Reverse => ret
+ .into_iter()
+ .rev()
+ .take(limit)
+ .map(|(_k, v)| v)
+ .collect::<Vec<_>>(),
+ };
Ok(ret_vec)
}
@@ -353,9 +400,7 @@ where
&self.endpoint,
who,
TableRpc::<F>::Update(vec![what_enc]),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(who.len())
- .with_timeout(TABLE_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(who.len()),
)
.await?;
Ok(())
@@ -378,8 +423,20 @@ where
let value = self.data.read_entry(key, sort_key)?;
Ok(TableRpc::ReadEntryResponse(value))
}
- TableRpc::ReadRange(key, begin_sort_key, filter, limit) => {
- let values = self.data.read_range(key, begin_sort_key, filter, *limit)?;
+ TableRpc::ReadRange {
+ partition,
+ begin_sort_key,
+ filter,
+ limit,
+ enumeration_order,
+ } => {
+ let values = self.data.read_range(
+ partition,
+ begin_sort_key,
+ filter,
+ *limit,
+ *enumeration_order,
+ )?;
Ok(TableRpc::Update(values))
}
TableRpc::Update(pairs) => {
diff --git a/src/table/util.rs b/src/table/util.rs
index 2a5c3afe..20595a94 100644
--- a/src/table/util.rs
+++ b/src/table/util.rs
@@ -17,7 +17,7 @@ impl PartitionKey for EmptyKey {
}
}
-#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum DeletedFilter {
Any,
Deleted,
@@ -33,3 +33,19 @@ impl DeletedFilter {
}
}
}
+
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
+pub enum EnumerationOrder {
+ Forward,
+ Reverse,
+}
+
+impl EnumerationOrder {
+ pub fn from_reverse(reverse: bool) -> Self {
+ if reverse {
+ Self::Reverse
+ } else {
+ Self::Forward
+ }
+ }
+}