diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/Cargo.toml | 10 | ||||
-rw-r--r-- | src/table/data.rs | 187 | ||||
-rw-r--r-- | src/table/gc.rs | 139 | ||||
-rw-r--r-- | src/table/merkle.rs | 150 | ||||
-rw-r--r-- | src/table/metrics.rs | 21 | ||||
-rw-r--r-- | src/table/schema.rs | 21 | ||||
-rw-r--r-- | src/table/sync.rs | 233 | ||||
-rw-r--r-- | src/table/table.rs | 143 | ||||
-rw-r--r-- | src/table/util.rs | 18 |
9 files changed, 551 insertions, 371 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index ed1a213f..38c6b41c 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_table" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,19 +14,19 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_rpc = { version = "0.7.0", path = "../rpc" } -garage_util = { version = "0.7.0", path = "../util" } +garage_db = { version = "0.8.0", path = "../db" } +garage_rpc = { version = "0.8.0", path = "../rpc" } +garage_util = { version = "0.8.0", path = "../util" } opentelemetry = "0.17" async-trait = "0.1.7" bytes = "1.0" +hex = "0.4" hexdump = "0.1" tracing = "0.1.30" rand = "0.8" -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/table/data.rs b/src/table/data.rs index ff7965f5..3212e82b 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -1,13 +1,15 @@ use core::borrow::Borrow; +use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::Transactional; use tokio::sync::Notify; +use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; + use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_rpc::system::System; @@ -16,19 +18,20 @@ use crate::gc::GcTodoEntry; use crate::metrics::*; use crate::replication::*; use crate::schema::*; +use crate::util::*; pub struct TableData<F: TableSchema, R: TableReplication> { system: Arc<System>, - pub(crate) instance: F, - pub(crate) replication: R, + pub instance: F, + pub replication: R, - pub store: sled::Tree, + pub store: db::Tree, - pub(crate) merkle_tree: sled::Tree, - pub(crate) merkle_todo: sled::Tree, + pub(crate) merkle_tree: db::Tree, + pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, - pub(crate) gc_todo: SledCountedTree, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, } @@ -38,7 +41,7 @@ where F: TableSchema, R: TableReplication, { - pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { + pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) .expect("Unable to open DB tree"); @@ -53,7 +56,7 @@ where let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); - let gc_todo = SledCountedTree::new(gc_todo); + let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); @@ -83,18 +86,48 @@ where pub fn read_range( &self, - p: &F::P, - s: &Option<F::S>, + partition_key: &F::P, + start: &Option<F::S>, + filter: &Option<F::Filter>, + limit: usize, + enumeration_order: EnumerationOrder, + ) -> Result<Vec<Arc<ByteBuf>>, Error> { + let partition_hash = partition_key.hash(); + match enumeration_order { + EnumerationOrder::Forward => { + let first_key = match start { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(partition_key, sk), + }; + let range = self.store.range(first_key..)?; + self.read_range_aux(partition_hash, range, filter, limit) + } + EnumerationOrder::Reverse => match start { + Some(sk) => { + let last_key = self.tree_key(partition_key, sk); + let range = self.store.range_rev(..=last_key)?; + self.read_range_aux(partition_hash, range, filter, limit) + } + None => { + let mut last_key = partition_hash.to_vec(); + let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); + last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); + let range = self.store.range_rev(..last_key)?; + self.read_range_aux(partition_hash, range, filter, limit) + } + }, + } + } + + fn read_range_aux<'a>( + &self, + partition_hash: Hash, + range: db::ValueIter<'a>, filter: &Option<F::Filter>, limit: usize, ) -> Result<Vec<Arc<ByteBuf>>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; let mut ret = vec![]; - for item in self.store.range(first_key..) { + for item in range { let (key, value) = item?; if &key[..32] != partition_hash.as_slice() { break; @@ -107,7 +140,7 @@ where } }; if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + ret.push(Arc::new(ByteBuf::from(value))); } if ret.len() >= limit { break; @@ -136,17 +169,29 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(&tree_key)? { + self.update_entry_with(&tree_key[..], |ent| match ent { + Some(mut ent) => { + ent.merge(&update); + ent + } + None => update.clone(), + })?; + Ok(()) + } + + pub fn update_entry_with( + &self, + tree_key: &[u8], + f: impl Fn(Option<F::E>) -> F::E, + ) -> Result<Option<F::E>, Error> { + let changed = self.store.db().transaction(|mut tx| { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { Some(old_bytes) => { - let old_entry = self - .decode_entry(&old_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); + let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; + let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, update.clone()), + None => (None, None, f(None)), }; // Scenario 1: the value changed, so of course there is a change @@ -158,24 +203,28 @@ where // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + .map_err(db::TxError::Abort)?; let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); + drop(old_bytes); if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; - store.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry, new_bytes_hash))) + tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?; + tx.insert(&self.store, tree_key, new_bytes)?; + + self.instance + .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; + + Ok(Some((new_entry, new_bytes_hash))) } else { Ok(None) } })?; - if let Some((old_entry, new_entry, new_bytes_hash)) = changed { + if let Some((new_entry, new_bytes_hash)) = changed { self.metrics.internal_update_counter.add(1); let is_tombstone = new_entry.is_tombstone(); - self.instance.updated(old_entry, Some(new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { // We are only responsible for GC'ing this item if we are the @@ -187,31 +236,34 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; + GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?; } } - } - Ok(()) + Ok(Some(new_entry)) + } else { + Ok(None) + } } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); + let removed = self + .store + .db() + .transaction(|mut tx| match tx.get(&self.store, k)? { + Some(cur_v) if cur_v == v => { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + + let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; + self.instance.updated(&mut tx, Some(&old_entry), None)?; + Ok(true) } - } - Ok(false) - })?; + _ => Ok(false), + })?; if removed { self.metrics.internal_delete_counter.add(1); - - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); self.merkle_todo_notify.notify_one(); } Ok(removed) @@ -222,36 +274,37 @@ where k: &[u8], vhash: Hash, ) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); + let removed = self + .store + .db() + .transaction(|mut tx| match tx.get(&self.store, k)? { + Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + + let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; + self.instance.updated(&mut tx, Some(&old_entry), None)?; + Ok(true) } - } - Ok(None) - })?; + _ => Ok(false), + })?; - if let Some(old_v) = removed { - let old_entry = self.decode_entry(&old_v[..])?; - self.instance.updated(Some(old_entry), None); + if removed { + self.metrics.internal_delete_counter.add(1); self.merkle_todo_notify.notify_one(); - Ok(true) - } else { - Ok(false) } + Ok(removed) } // ---- Utility functions ---- - pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { + pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); ret } - pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { + pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { Ok(x) => Ok(x), Err(e) => match F::try_migrate(bytes) { @@ -267,7 +320,7 @@ where } } - pub fn gc_todo_len(&self) -> usize { - self.gc_todo.len() + pub fn gc_todo_len(&self) -> Result<usize, Error> { + Ok(self.gc_todo.len()) } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 2a05b6ae..83e7eeff 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -8,13 +8,13 @@ use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use futures::future::join_all; -use futures::select; -use futures_util::future::*; use tokio::sync::watch; +use garage_db::counted_tree_hack::CountedTree; + +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_util::time::*; use garage_rpc::system::System; @@ -25,7 +25,6 @@ use crate::replication::*; use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; -const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); // GC delay for table entries: 1 day (24 hours) // (the delay before the entry is added in the GC todo list @@ -68,50 +67,24 @@ where gc.endpoint.set_handler(gc.clone()); - let gc1 = gc.clone(); - system.background.spawn_worker( - format!("GC loop for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit), - ); + system.background.spawn_worker(GcWorker::new(gc.clone())); gc } - async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - match self.gc_loop_iter().await { - Ok(None) => { - // Stuff was done, loop immediately - } - Ok(Some(wait_delay)) => { - // Nothing was done, wait specified delay. - select! { - _ = tokio::time::sleep(wait_delay).fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!("({}) Error doing GC: {}", F::TABLE_NAME, e); - } - } - } - } - async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { let now = now_msec(); - let mut entries = vec![]; - let mut excluded = vec![]; - // List entries in the GC todo list // These entries are put there when a tombstone is inserted in the table // (see update_entry in data.rs) - for entry_kv in self.data.gc_todo.iter() { + let mut candidates = vec![]; + for entry_kv in self.data.gc_todo.iter()? { let (k, vhash) = entry_kv?; - let mut todo_entry = GcTodoEntry::parse(&k, &vhash); + let todo_entry = GcTodoEntry::parse(&k, &vhash); if todo_entry.deletion_time() > now { - if entries.is_empty() && excluded.is_empty() { + if candidates.is_empty() { // If the earliest entry in the todo list shouldn't yet be processed, // return a duration to wait in the loop return Ok(Some(Duration::from_millis( @@ -123,15 +96,23 @@ where } } - let vhash = Hash::try_from(&vhash[..]).unwrap(); + candidates.push(todo_entry); + if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE { + break; + } + } + let mut entries = vec![]; + let mut excluded = vec![]; + for mut todo_entry in candidates { // Check if the tombstone is still the current value of the entry. // If not, we don't actually want to GC it, and we will remove it // from the gc_todo table later (below). + let vhash = todo_entry.value_hash; todo_entry.value = self .data .store - .get(&k[..])? + .get(&todo_entry.key[..])? .filter(|v| blake2sum(&v[..]) == vhash) .map(|v| v.to_vec()); @@ -254,9 +235,7 @@ where &self.endpoint, &nodes[..], GcRpc::Update(updates), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(nodes.len()) - .with_timeout(TABLE_GC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) .await .err_context("GC: send tombstones")?; @@ -277,9 +256,7 @@ where &self.endpoint, &nodes[..], GcRpc::DeleteIfEqualHash(deletes), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(nodes.len()) - .with_timeout(TABLE_GC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) .await .err_context("GC: remote delete tombstones")?; @@ -321,6 +298,66 @@ where } } +struct GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + gc: Arc<TableGc<F, R>>, + wait_delay: Duration, +} + +impl<F, R> GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn new(gc: Arc<TableGc<F, R>>) -> Self { + Self { + gc, + wait_delay: Duration::from_secs(0), + } + } +} + +#[async_trait] +impl<F, R> Worker for GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} GC", F::TABLE_NAME) + } + + fn info(&self) -> Option<String> { + let l = self.gc.data.gc_todo_len().unwrap_or(0); + if l > 0 { + Some(format!("{} items in queue", l)) + } else { + None + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + match self.gc.gc_loop_iter().await? { + None => Ok(WorkerState::Busy), + Some(delay) => { + self.wait_delay = delay; + Ok(WorkerState::Idle) + } + } + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + tokio::time::sleep(self.wait_delay).await; + WorkerState::Busy + } +} + /// An entry stored in the gc_todo Sled tree associated with the table /// Contains helper function for parsing, saving, and removing /// such entry in Sled @@ -353,17 +390,17 @@ impl GcTodoEntry { } /// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree - pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self { + pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self { Self { - tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()), - key: sled_k[8..].to_vec(), - value_hash: Hash::try_from(sled_v).unwrap(), + tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()), + key: db_k[8..].to_vec(), + value_hash: Hash::try_from(db_v).unwrap(), value: None, } } /// Saves the GcTodoEntry in the gc_todo tree - pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { + pub(crate) fn save(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> { gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; Ok(()) } @@ -373,9 +410,9 @@ impl GcTodoEntry { /// This is usefull to remove a todo entry only under the condition /// that it has not changed since the time it was read, i.e. /// what we have to do is still the same - pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { - let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>( - &self.todo_table_key()[..], + pub(crate) fn remove_if_equal(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> { + gc_todo_tree.compare_and_swap::<_, _, &[u8]>( + &self.todo_table_key(), Some(self.value_hash), None, )?; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 93bf7e47..a5c29723 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -1,15 +1,13 @@ use std::sync::Arc; use std::time::Duration; -use futures::select; -use futures_util::future::*; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use sled::transaction::{ - ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, -}; use tokio::sync::watch; -use garage_util::background::BackgroundRunner; +use garage_db as db; + +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -79,43 +77,17 @@ where empty_node_hash, }); - let ret2 = ret.clone(); - background.spawn_worker( - format!("Merkle tree updater for {}", F::TABLE_NAME), - |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), - ); + background.spawn_worker(MerkleWorker(ret.clone())); ret } - async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - if let Some(x) = self.data.merkle_todo.iter().next() { - match x { - Ok((key, valhash)) => { - if let Err(e) = self.update_item(&key[..], &valhash[..]) { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - } - } - Err(e) => { - warn!( - "({}) Error while iterating on Merkle todo tree: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } else { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } + fn updater_loop_iter(&self) -> Result<WorkerState, Error> { + if let Some((key, valhash)) = self.data.merkle_todo.first()? { + self.update_item(&key, &valhash)?; + Ok(WorkerState::Busy) + } else { + Ok(WorkerState::Idle) } } @@ -137,13 +109,16 @@ where }; self.data .merkle_tree - .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; + .db() + .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?; - let deleted = self - .data - .merkle_todo - .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? - .is_ok(); + let deleted = self.data.merkle_todo.db().transaction(|mut tx| { + let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by); + if remove { + tx.remove(&self.data.merkle_todo, k)?; + } + Ok(remove) + })?; if !deleted { debug!( @@ -157,12 +132,12 @@ where fn update_item_rec( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &[u8], khash: &Hash, key: &MerkleNodeKey, new_vhash: Option<Hash>, - ) -> ConflictableTransactionResult<Option<Hash>, Error> { + ) -> db::TxResult<Option<Hash>, Error> { let i = key.prefix.len(); // Read node at current position (defined by the prefix stored in key) @@ -203,7 +178,7 @@ where } MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)), x @ MerkleNode::Leaf(_, _) => { - tx.remove(key_sub.encode())?; + tx.remove(&self.data.merkle_tree, key_sub.encode())?; Some(x) } } @@ -283,28 +258,27 @@ where fn read_node_txn( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &MerkleNodeKey, - ) -> ConflictableTransactionResult<MerkleNode, Error> { - let ent = tx.get(k.encode())?; - MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort) + ) -> db::TxResult<MerkleNode, Error> { + let ent = tx.get(&self.data.merkle_tree, k.encode())?; + MerkleNode::decode_opt(&ent).map_err(db::TxError::Abort) } fn put_node_txn( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &MerkleNodeKey, v: &MerkleNode, - ) -> ConflictableTransactionResult<Hash, Error> { + ) -> db::TxResult<Hash, Error> { trace!("Put Merkle node: {:?} => {:?}", k, v); if *v == MerkleNode::Empty { - tx.remove(k.encode())?; + tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v) - .map_err(|e| ConflictableTransactionError::Abort(e.into()))?; + let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); - tx.insert(k.encode(), vby)?; + tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) } } @@ -312,15 +286,63 @@ where // Access a node in the Merkle tree, used by the sync protocol pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> { let ent = self.data.merkle_tree.get(k.encode())?; - MerkleNode::decode_opt(ent) + MerkleNode::decode_opt(&ent) + } + + pub fn merkle_tree_len(&self) -> Result<usize, Error> { + Ok(self.data.merkle_tree.len()?) } - pub fn merkle_tree_len(&self) -> usize { - self.data.merkle_tree.len() + pub fn todo_len(&self) -> Result<usize, Error> { + Ok(self.data.merkle_todo.len()?) } +} + +struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; - pub fn todo_len(&self) -> usize { - self.data.merkle_todo.len() +#[async_trait] +impl<F, R> Worker for MerkleWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} Merkle tree updater", F::TABLE_NAME) + } + + fn info(&self) -> Option<String> { + let l = self.0.todo_len().unwrap_or(0); + if l > 0 { + Some(format!("{} items in queue", l)) + } else { + None + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + let updater = self.0.clone(); + tokio::task::spawn_blocking(move || { + for _i in 0..100 { + let s = updater.updater_loop_iter(); + if !matches!(s, Ok(WorkerState::Busy)) { + return s; + } + } + Ok(WorkerState::Busy) + }) + .await + .unwrap() + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + tokio::time::sleep(Duration::from_secs(10)).await; + WorkerState::Busy } } @@ -347,7 +369,7 @@ impl MerkleNodeKey { } impl MerkleNode { - fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> { + fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> { match ent { None => Ok(MerkleNode::Empty), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 752a2a6d..3a1783e0 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -1,6 +1,7 @@ use opentelemetry::{global, metrics::*, KeyValue}; -use garage_util::sled_counter::SledCountedTree; +use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct TableMetrics { @@ -19,21 +20,19 @@ pub struct TableMetrics { pub(crate) sync_items_received: Counter<u64>, } impl TableMetrics { - pub fn new( - table_name: &'static str, - merkle_todo: sled::Tree, - gc_todo: SledCountedTree, - ) -> Self { + pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self { let meter = global::meter(table_name); TableMetrics { _merkle_todo_len: meter .u64_value_observer( "table.merkle_updater_todo_queue_length", move |observer| { - observer.observe( - merkle_todo.len() as u64, - &[KeyValue::new("table_name", table_name)], - ) + if let Ok(v) = merkle_todo.len() { + observer.observe( + v as u64, + &[KeyValue::new("table_name", table_name)], + ); + } }, ) .with_description("Merkle tree updater TODO queue length") @@ -45,7 +44,7 @@ impl TableMetrics { observer.observe( gc_todo.len() as u64, &[KeyValue::new("table_name", table_name)], - ) + ); }, ) .with_description("Table garbage collector TODO queue length") diff --git a/src/table/schema.rs b/src/table/schema.rs index eba918a2..f37e98d8 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; +use garage_db as db; use garage_util::data::*; use crate::crdt::Crdt; @@ -59,7 +60,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>: } /// Trait for the schema used in a table -pub trait TableSchema: Send + Sync { +pub trait TableSchema: Send + Sync + 'static { /// The name of the table in the database const TABLE_NAME: &'static str; @@ -82,11 +83,19 @@ pub trait TableSchema: Send + Sync { None } - // Updated triggers some stuff downstream, but it is not supposed to block or fail, - // as the update itself is an unchangeable fact that will never go back - // due to CRDT logic. Typically errors in propagation of info should be logged - // to stderr. - fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {} + /// Actions triggered by data changing in a table. If such actions + /// include updates to the local database that should be applied + /// atomically with the item update itself, a db transaction is + /// provided on which these changes should be done. + /// This function can return a DB error but that's all. + fn updated( + &self, + _tx: &mut db::Transaction, + _old: Option<&Self::E>, + _new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + Ok(()) + } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; } diff --git a/src/table/sync.rs b/src/table/sync.rs index 08069ad0..9d79d856 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -1,17 +1,17 @@ use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; -use futures::select; -use futures_util::future::*; use futures_util::stream::*; use opentelemetry::KeyValue; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use tokio::select; use tokio::sync::{mpsc, watch}; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -24,8 +24,6 @@ use crate::merkle::*; use crate::replication::*; use crate::*; -const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); - // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); @@ -34,7 +32,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, - todo: Mutex<SyncTodo>, + add_full_sync_tx: mpsc::UnboundedSender<()>, endpoint: Arc<Endpoint<SyncRpc, Self>>, } @@ -52,10 +50,6 @@ impl Rpc for SyncRpc { type Response = Result<SyncRpc, Error>; } -struct SyncTodo { - todo: Vec<TodoPartition>, -} - #[derive(Debug, Clone)] struct TodoPartition { partition: Partition, @@ -80,118 +74,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let todo = SyncTodo { todo: vec![] }; + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); let syncer = Arc::new(Self { system: system.clone(), data, merkle, - todo: Mutex::new(todo), + add_full_sync_tx, endpoint, }); syncer.endpoint.set_handler(syncer.clone()); - let (busy_tx, busy_rx) = mpsc::unbounded_channel(); - - let s1 = syncer.clone(); - system.background.spawn_worker( - format!("table sync watcher for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), - ); - - let s2 = syncer.clone(); - system.background.spawn_worker( - format!("table syncer for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), - ); - - let s3 = syncer.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(20)).await; - s3.add_full_sync(); + system.background.spawn_worker(SyncWorker { + syncer: syncer.clone(), + ring_recv: system.ring.clone(), + ring: system.ring.borrow().clone(), + add_full_sync_rx, + todo: vec![], + next_full_sync: Instant::now() + Duration::from_secs(20), }); syncer } - async fn watcher_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - mut busy_rx: mpsc::UnboundedReceiver<bool>, - ) { - let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone(); - let mut nothing_to_do_since = Some(Instant::now()); - - while !*must_exit.borrow() { - select! { - _ = ring_recv.changed().fuse() => { - let new_ring = ring_recv.borrow(); - if !Arc::ptr_eq(&new_ring, &prev_ring) { - debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - prev_ring = new_ring.clone(); - } - } - busy_opt = busy_rx.recv().fuse() => { - if let Some(busy) = busy_opt { - if busy { - nothing_to_do_since = None; - } else if nothing_to_do_since.is_none() { - nothing_to_do_since = Some(Instant::now()); - } - } - } - _ = must_exit.changed().fuse() => {}, - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { - if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { - nothing_to_do_since = None; - debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - } - } - } - } - } - pub fn add_full_sync(&self) { - self.todo - .lock() - .unwrap() - .add_full_sync(&self.data, &self.system); - } - - async fn syncer_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - busy_tx: mpsc::UnboundedSender<bool>, - ) { - while !*must_exit.borrow() { - let task = self.todo.lock().unwrap().pop_task(); - if let Some(partition) = task { - busy_tx.send(true).unwrap(); - let res = self - .clone() - .sync_partition(&partition, &mut must_exit) - .await; - if let Err(e) = res { - warn!( - "({}) Error while syncing {:?}: {}", - F::TABLE_NAME, - partition, - e - ); - } - } else { - busy_tx.send(false).unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - } + if self.add_full_sync_tx.send(()).is_err() { + error!("({}) Could not add full sync", F::TABLE_NAME); } } + // ---- + async fn sync_partition( - self: Arc<Self>, + self: &Arc<Self>, partition: &TodoPartition, must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { @@ -258,9 +174,9 @@ where while !*must_exit.borrow() { let mut items = Vec::new(); - for item in self.data.store.range(begin.to_vec()..end.to_vec()) { + for item in self.data.store.range(begin.to_vec()..end.to_vec())? { let (key, value) = item?; - items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); + items.push((key.to_vec(), Arc::new(ByteBuf::from(value)))); if items.len() >= 1024 { break; @@ -329,9 +245,7 @@ where &self.endpoint, nodes, SyncRpc::Items(values), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(nodes.len()) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) .await?; @@ -392,8 +306,7 @@ where &self.endpoint, who, SyncRpc::RootCkHash(partition.partition, root_ck_hash), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await?; @@ -432,11 +345,11 @@ where // Just send that item directly if let Some(val) = self.data.store.get(&ik[..])? { if blake2sum(&val[..]) != ivhash { - warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik); + debug!("({}) Hashes differ between stored value and Merkle tree, key: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik)); } todo_items.push(val.to_vec()); } else { - warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik); + debug!("({}) Item from Merkle tree not found in store: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik)); } } MerkleNode::Intermediate(l) => { @@ -449,8 +362,7 @@ where &self.endpoint, who, SyncRpc::GetNode(key.clone()), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await? { @@ -526,8 +438,7 @@ where &self.endpoint, who, SyncRpc::Items(values), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await?; if let SyncRpc::Ok = rpc_resp { @@ -577,12 +488,22 @@ where } } -impl SyncTodo { - fn add_full_sync<F: TableSchema, R: TableReplication>( - &mut self, - data: &TableData<F, R>, - system: &System, - ) { +// -------- Sync Worker --------- + +struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { + syncer: Arc<TableSyncer<F, R>>, + ring_recv: watch::Receiver<Arc<Ring>>, + ring: Arc<Ring>, + add_full_sync_rx: mpsc::UnboundedReceiver<()>, + todo: Vec<TodoPartition>, + next_full_sync: Instant, +} + +impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { + fn add_full_sync(&mut self) { + let system = &self.syncer.system; + let data = &self.syncer.data; + let my_id = system.id; self.todo.clear(); @@ -603,8 +524,16 @@ impl SyncTodo { let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if data.store.range(begin..end).next().is_none() { - continue; + match data.store.range(begin..end) { + Ok(mut iter) => { + if iter.next().is_none() { + continue; + } + } + Err(e) => { + warn!("DB error in add_full_sync: {}", e); + continue; + } } } @@ -615,6 +544,8 @@ impl SyncTodo { retain, }); } + + self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; } fn pop_task(&mut self) -> Option<TodoPartition> { @@ -633,6 +564,62 @@ impl SyncTodo { } } +#[async_trait] +impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { + fn name(&self) -> String { + format!("{} sync", F::TABLE_NAME) + } + + fn info(&self) -> Option<String> { + let l = self.todo.len(); + if l > 0 { + Some(format!("{} partitions remaining", l)) + } else { + None + } + } + + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + if let Some(partition) = self.pop_task() { + self.syncer.sync_partition(&partition, must_exit).await?; + Ok(WorkerState::Busy) + } else { + Ok(WorkerState::Idle) + } + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + select! { + s = self.add_full_sync_rx.recv() => { + if let Some(()) = s { + self.add_full_sync(); + } + }, + _ = self.ring_recv.changed() => { + let new_ring = self.ring_recv.borrow(); + if !Arc::ptr_eq(&new_ring, &self.ring) { + self.ring = new_ring.clone(); + drop(new_ring); + debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); + self.add_full_sync(); + } + }, + _ = tokio::time::sleep_until(self.next_full_sync.into()) => { + self.add_full_sync(); + } + } + match self.todo.is_empty() { + false => WorkerState::Busy, + true => WorkerState::Idle, + } + } +} + +// ---- UTIL ---- + fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> { Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) } diff --git a/src/table/table.rs b/src/table/table.rs index 7f87a449..8a66c420 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,6 +1,6 @@ -use std::collections::{BTreeMap, HashMap}; +use std::borrow::Borrow; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; use futures::stream::*; @@ -12,6 +12,8 @@ use opentelemetry::{ Context, }; +use garage_db as db; + use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -26,8 +28,7 @@ use crate::merkle::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; - -const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); +use crate::util::*; pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub system: Arc<System>, @@ -45,7 +46,13 @@ pub(crate) enum TableRpc<F: TableSchema> { ReadEntryResponse(Option<ByteBuf>), // Read range: read all keys in partition P, possibly starting at a certain sort key offset - ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize), + ReadRange { + partition: F::P, + begin_sort_key: Option<F::S>, + filter: Option<F::Filter>, + limit: usize, + enumeration_order: EnumerationOrder, + }, Update(Vec<Arc<ByteBuf>>), } @@ -61,7 +68,7 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub fn new(instance: F, replication: R, system: Arc<System>, db: &sled::Db) -> Arc<Self> { + pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> { let endpoint = system .netapp .endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME)); @@ -103,7 +110,6 @@ where async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); - //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRpc::<F>::Update(vec![e_enc]); @@ -115,17 +121,20 @@ where &who[..], rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.write_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT), + .with_quorum(self.data.replication.write_quorum()), ) .await?; Ok(()) } - pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator<Item = IE> + Send + Sync, + IE: Borrow<F::E> + Send + Sync, + { let tracer = opentelemetry::global::tracer("garage_table"); - let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len())); + let span = tracer.start(format!("{} insert_many", F::TABLE_NAME)); self.insert_many_internal(entries) .bound_record_duration(&self.data.metrics.put_request_duration) @@ -137,10 +146,15 @@ where Ok(()) } - async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> { + async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator<Item = IE> + Send + Sync, + IE: Borrow<F::E> + Send + Sync, + { let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); - for entry in entries.iter() { + for entry in entries.into_iter() { + let entry = entry.borrow(); let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); @@ -159,7 +173,7 @@ where &self.endpoint, node, rpc, - RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_NORMAL), ) .await?; Ok::<_, Error>((node, resp)) @@ -216,7 +230,6 @@ where rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.read_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; @@ -261,12 +274,19 @@ where begin_sort_key: Option<F::S>, filter: Option<F::Filter>, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result<Vec<F::E>, Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} get_range", F::TABLE_NAME)); let res = self - .get_range_internal(partition_key, begin_sort_key, filter, limit) + .get_range_internal( + partition_key, + begin_sort_key, + filter, + limit, + enumeration_order, + ) .bound_record_duration(&self.data.metrics.get_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -282,11 +302,18 @@ where begin_sort_key: Option<F::S>, filter: Option<F::Filter>, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); - let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + let rpc = TableRpc::<F>::ReadRange { + partition: partition_key.clone(), + begin_sort_key, + filter, + limit, + enumeration_order, + }; let resps = self .system @@ -297,49 +324,69 @@ where rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.read_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; - let mut ret = BTreeMap::new(); - let mut to_repair = BTreeMap::new(); + let mut ret: BTreeMap<Vec<u8>, F::E> = BTreeMap::new(); + let mut to_repair = BTreeSet::new(); for resp in resps { if let TableRpc::Update(entries) = resp { for entry_bytes in entries.iter() { let entry = self.data.decode_entry(entry_bytes.as_slice())?; let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); - match ret.remove(&entry_key) { - None => { - ret.insert(entry_key, Some(entry)); - } - Some(Some(mut prev)) => { - let must_repair = prev != entry; - prev.merge(&entry); - if must_repair { - to_repair.insert(entry_key.clone(), Some(prev.clone())); + match ret.get_mut(&entry_key) { + Some(e) => { + if *e != entry { + e.merge(&entry); + to_repair.insert(entry_key.clone()); } - ret.insert(entry_key, Some(prev)); } - Some(None) => unreachable!(), + None => { + ret.insert(entry_key, entry); + } } } + } else { + return Err(Error::unexpected_rpc_message(resp)); } } + if !to_repair.is_empty() { let self2 = self.clone(); + let to_repair = to_repair + .into_iter() + .map(|k| ret.get(&k).unwrap().clone()) + .collect::<Vec<_>>(); self.system.background.spawn_cancellable(async move { - for (_, v) in to_repair.iter_mut() { - self2.repair_on_read(&who[..], v.take().unwrap()).await?; + for v in to_repair { + self2.repair_on_read(&who[..], v).await?; } Ok(()) }); } - let ret_vec = ret - .iter_mut() - .take(limit) - .map(|(_k, v)| v.take().unwrap()) - .collect::<Vec<_>>(); + + // At this point, the `ret` btreemap might contain more than `limit` + // items, because nodes might have returned us each `limit` items + // but for different keys. We have to take only the first `limit` items + // in this map, in the specified enumeration order, for two reasons: + // 1. To return to the user no more than the number of items that they requested + // 2. To return only items for which we have a read quorum: we do not know + // that we have a read quorum for the items after the first `limit` + // of them + let ret_vec = match enumeration_order { + EnumerationOrder::Forward => ret + .into_iter() + .take(limit) + .map(|(_k, v)| v) + .collect::<Vec<_>>(), + EnumerationOrder::Reverse => ret + .into_iter() + .rev() + .take(limit) + .map(|(_k, v)| v) + .collect::<Vec<_>>(), + }; Ok(ret_vec) } @@ -353,9 +400,7 @@ where &self.endpoint, who, TableRpc::<F>::Update(vec![what_enc]), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(who.len()) - .with_timeout(TABLE_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(who.len()), ) .await?; Ok(()) @@ -378,8 +423,20 @@ where let value = self.data.read_entry(key, sort_key)?; Ok(TableRpc::ReadEntryResponse(value)) } - TableRpc::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; + TableRpc::ReadRange { + partition, + begin_sort_key, + filter, + limit, + enumeration_order, + } => { + let values = self.data.read_range( + partition, + begin_sort_key, + filter, + *limit, + *enumeration_order, + )?; Ok(TableRpc::Update(values)) } TableRpc::Update(pairs) => { diff --git a/src/table/util.rs b/src/table/util.rs index 2a5c3afe..20595a94 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -17,7 +17,7 @@ impl PartitionKey for EmptyKey { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum DeletedFilter { Any, Deleted, @@ -33,3 +33,19 @@ impl DeletedFilter { } } } + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum EnumerationOrder { + Forward, + Reverse, +} + +impl EnumerationOrder { + pub fn from_reverse(reverse: bool) -> Self { + if reverse { + Self::Reverse + } else { + Self::Forward + } + } +} |