From 5768bf362262f78376af14517c4921941986192e Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 10 May 2022 13:16:57 +0200 Subject: First implementation of K2V (#293) **Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex Co-committed-by: Alex --- src/table/data.rs | 98 +++++++++++++++++++++++++++++----------- src/table/schema.rs | 2 +- src/table/table.rs | 126 +++++++++++++++++++++++++++++++++++++++------------- src/table/util.rs | 18 +++++++- 4 files changed, 186 insertions(+), 58 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index ff7965f5..5cb10066 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -1,8 +1,9 @@ use core::borrow::Borrow; +use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::Transactional; +use sled::{IVec, Transactional}; use tokio::sync::Notify; use garage_util::data::*; @@ -16,12 +17,13 @@ use crate::gc::GcTodoEntry; use crate::metrics::*; use crate::replication::*; use crate::schema::*; +use crate::util::*; pub struct TableData { system: Arc, - pub(crate) instance: F, - pub(crate) replication: R, + pub instance: F, + pub replication: R, pub store: sled::Tree, @@ -83,18 +85,48 @@ where pub fn read_range( &self, - p: &F::P, - s: &Option, + partition_key: &F::P, + start: &Option, + filter: &Option, + limit: usize, + enumeration_order: EnumerationOrder, + ) -> Result>, Error> { + let partition_hash = partition_key.hash(); + match enumeration_order { + EnumerationOrder::Forward => { + let first_key = match start { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(partition_key, sk), + }; + let range = self.store.range(first_key..); + self.read_range_aux(partition_hash, range, filter, limit) + } + EnumerationOrder::Reverse => match start { + Some(sk) => { + let last_key = self.tree_key(partition_key, sk); + let range = self.store.range(..=last_key).rev(); + self.read_range_aux(partition_hash, range, filter, limit) + } + None => { + let mut last_key = partition_hash.to_vec(); + let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); + last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); + let range = self.store.range(..last_key).rev(); + self.read_range_aux(partition_hash, range, filter, limit) + } + }, + } + } + + fn read_range_aux( + &self, + partition_hash: Hash, + range: impl Iterator>, filter: &Option, limit: usize, ) -> Result>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; let mut ret = vec![]; - for item in self.store.range(first_key..) { + for item in range { let (key, value) = item?; if &key[..32] != partition_hash.as_slice() { break; @@ -136,17 +168,31 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + self.update_entry_with(&tree_key[..], |ent| match ent { + Some(mut ent) => { + ent.merge(&update); + ent + } + None => update.clone(), + })?; + Ok(()) + } + + pub fn update_entry_with( + &self, + tree_key: &[u8], + f: impl Fn(Option) -> F::E, + ) -> Result, Error> { let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(&tree_key)? { + let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? { Some(old_bytes) => { let old_entry = self .decode_entry(&old_bytes) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); + let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, update.clone()), + None => (None, None, f(None)), }; // Scenario 1: the value changed, so of course there is a change @@ -163,8 +209,8 @@ where if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; - store.insert(tree_key.clone(), new_bytes)?; + mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?; + store.insert(tree_key.to_vec(), new_bytes)?; Ok(Some((old_entry, new_entry, new_bytes_hash))) } else { Ok(None) @@ -175,7 +221,7 @@ where self.metrics.internal_update_counter.add(1); let is_tombstone = new_entry.is_tombstone(); - self.instance.updated(old_entry, Some(new_entry)); + self.instance.updated(old_entry.as_ref(), Some(&new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { // We are only responsible for GC'ing this item if we are the @@ -187,12 +233,14 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; + GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?; } } - } - Ok(()) + Ok(Some(new_entry)) + } else { + Ok(None) + } } pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { @@ -211,7 +259,7 @@ where self.metrics.internal_delete_counter.add(1); let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); + self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); } Ok(removed) @@ -235,7 +283,7 @@ where if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; - self.instance.updated(Some(old_entry), None); + self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); Ok(true) } else { @@ -245,13 +293,13 @@ where // ---- Utility functions ---- - pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { + pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); ret } - pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result { + pub fn decode_entry(&self, bytes: &[u8]) -> Result { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { Ok(x) => Ok(x), Err(e) => match F::try_migrate(bytes) { diff --git a/src/table/schema.rs b/src/table/schema.rs index eba918a2..37327037 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -86,7 +86,7 @@ pub trait TableSchema: Send + Sync { // as the update itself is an unchangeable fact that will never go back // due to CRDT logic. Typically errors in propagation of info should be logged // to stderr. - fn updated(&self, _old: Option, _new: Option) {} + fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {} fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; } diff --git a/src/table/table.rs b/src/table/table.rs index 7f87a449..2a167604 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,4 +1,5 @@ -use std::collections::{BTreeMap, HashMap}; +use std::borrow::Borrow; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use std::time::Duration; @@ -26,8 +27,9 @@ use crate::merkle::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; +use crate::util::*; -const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); +pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct Table { pub system: Arc, @@ -45,7 +47,13 @@ pub(crate) enum TableRpc { ReadEntryResponse(Option), // Read range: read all keys in partition P, possibly starting at a certain sort key offset - ReadRange(F::P, Option, Option, usize), + ReadRange { + partition: F::P, + begin_sort_key: Option, + filter: Option, + limit: usize, + enumeration_order: EnumerationOrder, + }, Update(Vec>), } @@ -123,9 +131,13 @@ where Ok(()) } - pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + pub async fn insert_many(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator + Send + Sync, + IE: Borrow + Send + Sync, + { let tracer = opentelemetry::global::tracer("garage_table"); - let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len())); + let span = tracer.start(format!("{} insert_many", F::TABLE_NAME)); self.insert_many_internal(entries) .bound_record_duration(&self.data.metrics.put_request_duration) @@ -137,10 +149,15 @@ where Ok(()) } - async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> { + async fn insert_many_internal(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator + Send + Sync, + IE: Borrow + Send + Sync, + { let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); - for entry in entries.iter() { + for entry in entries.into_iter() { + let entry = entry.borrow(); let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); @@ -261,12 +278,19 @@ where begin_sort_key: Option, filter: Option, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result, Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} get_range", F::TABLE_NAME)); let res = self - .get_range_internal(partition_key, begin_sort_key, filter, limit) + .get_range_internal( + partition_key, + begin_sort_key, + filter, + limit, + enumeration_order, + ) .bound_record_duration(&self.data.metrics.get_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -282,11 +306,18 @@ where begin_sort_key: Option, filter: Option, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result, Error> { let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); - let rpc = TableRpc::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + let rpc = TableRpc::::ReadRange { + partition: partition_key.clone(), + begin_sort_key, + filter, + limit, + enumeration_order, + }; let resps = self .system @@ -302,44 +333,65 @@ where ) .await?; - let mut ret = BTreeMap::new(); - let mut to_repair = BTreeMap::new(); + let mut ret: BTreeMap, F::E> = BTreeMap::new(); + let mut to_repair = BTreeSet::new(); for resp in resps { if let TableRpc::Update(entries) = resp { for entry_bytes in entries.iter() { let entry = self.data.decode_entry(entry_bytes.as_slice())?; let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); - match ret.remove(&entry_key) { - None => { - ret.insert(entry_key, Some(entry)); - } - Some(Some(mut prev)) => { - let must_repair = prev != entry; - prev.merge(&entry); - if must_repair { - to_repair.insert(entry_key.clone(), Some(prev.clone())); + match ret.get_mut(&entry_key) { + Some(e) => { + if *e != entry { + e.merge(&entry); + to_repair.insert(entry_key.clone()); } - ret.insert(entry_key, Some(prev)); } - Some(None) => unreachable!(), + None => { + ret.insert(entry_key, entry); + } } } + } else { + return Err(Error::unexpected_rpc_message(resp)); } } + if !to_repair.is_empty() { let self2 = self.clone(); + let to_repair = to_repair + .into_iter() + .map(|k| ret.get(&k).unwrap().clone()) + .collect::>(); self.system.background.spawn_cancellable(async move { - for (_, v) in to_repair.iter_mut() { - self2.repair_on_read(&who[..], v.take().unwrap()).await?; + for v in to_repair { + self2.repair_on_read(&who[..], v).await?; } Ok(()) }); } - let ret_vec = ret - .iter_mut() - .take(limit) - .map(|(_k, v)| v.take().unwrap()) - .collect::>(); + + // At this point, the `ret` btreemap might contain more than `limit` + // items, because nodes might have returned us each `limit` items + // but for different keys. We have to take only the first `limit` items + // in this map, in the specified enumeration order, for two reasons: + // 1. To return to the user no more than the number of items that they requested + // 2. To return only items for which we have a read quorum: we do not know + // that we have a read quorum for the items after the first `limit` + // of them + let ret_vec = match enumeration_order { + EnumerationOrder::Forward => ret + .into_iter() + .take(limit) + .map(|(_k, v)| v) + .collect::>(), + EnumerationOrder::Reverse => ret + .into_iter() + .rev() + .take(limit) + .map(|(_k, v)| v) + .collect::>(), + }; Ok(ret_vec) } @@ -378,8 +430,20 @@ where let value = self.data.read_entry(key, sort_key)?; Ok(TableRpc::ReadEntryResponse(value)) } - TableRpc::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; + TableRpc::ReadRange { + partition, + begin_sort_key, + filter, + limit, + enumeration_order, + } => { + let values = self.data.read_range( + partition, + begin_sort_key, + filter, + *limit, + *enumeration_order, + )?; Ok(TableRpc::Update(values)) } TableRpc::Update(pairs) => { diff --git a/src/table/util.rs b/src/table/util.rs index 2a5c3afe..20595a94 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -17,7 +17,7 @@ impl PartitionKey for EmptyKey { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum DeletedFilter { Any, Deleted, @@ -33,3 +33,19 @@ impl DeletedFilter { } } } + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum EnumerationOrder { + Forward, + Reverse, +} + +impl EnumerationOrder { + pub fn from_reverse(reverse: bool) -> Self { + if reverse { + Self::Reverse + } else { + Self::Forward + } + } +} -- cgit v1.2.3 From b44d3fc796484a50cd6854f20c9b46e5fddedc9d Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 8 Jun 2022 10:01:44 +0200 Subject: Abstract database behind generic interface and implement alternative drivers (#322) - [x] Design interface - [x] Implement Sled backend - [x] Re-implement the SledCountedTree hack ~~on Sled backend~~ on all backends (i.e. over the abstraction) - [x] Convert Garage code to use generic interface - [x] Proof-read converted Garage code - [ ] Test everything well - [x] Implement sqlite backend - [x] Implement LMDB backend - [ ] (Implement Persy backend?) - [ ] (Implement other backends? (like RocksDB, ...)) - [x] Implement backend choice in config file and garage server module - [x] Add CLI for converting between DB formats - Exploit the new interface to put more things in transactions - [x] `.updated()` trigger on Garage tables Fix #284 **Bugs** - [x] When exporting sqlite, trees iterate empty?? - [x] LMDB doesn't work **Known issues for various back-ends** - Sled: - Eats all my RAM and also all my disk space - `.len()` has to traverse the whole table - Is actually quite slow on some operations - And is actually pretty bad code... - Sqlite: - Requires a lock to be taken on all operations. The lock is also taken when iterating on a table with `.iter()`, and the lock isn't released until the iterator is dropped. This means that we must be VERY carefull to not do anything else inside a `.iter()` loop or else we will have a deadlock! Most such cases have been eliminated from the Garage codebase, but there might still be some that remain. If your Garage-over-Sqlite seems to hang/freeze, this is the reason. - (adapter uses a bunch of unsafe code) - Heed (LMDB): - Not suited for 32-bit machines as it has to map the whole DB in memory. - (adpater uses a tiny bit of unsafe code) **My recommendation:** avoid 32-bit machines and use LMDB as much as possible. **Converting databases** is actually quite easy. For example from Sled to LMDB: ```bash cd src/db cargo run --features cli --bin convert -- -i path/to/garage/meta/db -a sled -o path/to/garage/meta/db.lmdb -b lmdb ``` Then, just add this to your `config.toml`: ```toml db_engine = "lmdb" ``` Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/322 Co-authored-by: Alex Co-committed-by: Alex --- src/table/Cargo.toml | 3 +- src/table/data.rs | 113 +++++++++++++++++++++++++++------------------------ src/table/gc.rs | 41 +++++++++++-------- src/table/merkle.rs | 101 ++++++++++++++++++++++----------------------- src/table/metrics.rs | 21 +++++----- src/table/schema.rs | 19 ++++++--- src/table/sync.rs | 16 ++++++-- src/table/table.rs | 4 +- 8 files changed, 174 insertions(+), 144 deletions(-) (limited to 'src/table') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index ed1a213f..6de37cda 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_rpc = { version = "0.7.0", path = "../rpc" } garage_util = { version = "0.7.0", path = "../util" } @@ -25,8 +26,6 @@ hexdump = "0.1" tracing = "0.1.30" rand = "0.8" -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/table/data.rs b/src/table/data.rs index 5cb10066..3212e82b 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -3,12 +3,13 @@ use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::{IVec, Transactional}; use tokio::sync::Notify; +use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; + use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_rpc::system::System; @@ -25,12 +26,12 @@ pub struct TableData { pub instance: F, pub replication: R, - pub store: sled::Tree, + pub store: db::Tree, - pub(crate) merkle_tree: sled::Tree, - pub(crate) merkle_todo: sled::Tree, + pub(crate) merkle_tree: db::Tree, + pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, - pub(crate) gc_todo: SledCountedTree, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, } @@ -40,7 +41,7 @@ where F: TableSchema, R: TableReplication, { - pub fn new(system: Arc, instance: F, replication: R, db: &sled::Db) -> Arc { + pub fn new(system: Arc, instance: F, replication: R, db: &db::Db) -> Arc { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) .expect("Unable to open DB tree"); @@ -55,7 +56,7 @@ where let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); - let gc_todo = SledCountedTree::new(gc_todo); + let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); @@ -98,30 +99,30 @@ where None => partition_hash.to_vec(), Some(sk) => self.tree_key(partition_key, sk), }; - let range = self.store.range(first_key..); + let range = self.store.range(first_key..)?; self.read_range_aux(partition_hash, range, filter, limit) } EnumerationOrder::Reverse => match start { Some(sk) => { let last_key = self.tree_key(partition_key, sk); - let range = self.store.range(..=last_key).rev(); + let range = self.store.range_rev(..=last_key)?; self.read_range_aux(partition_hash, range, filter, limit) } None => { let mut last_key = partition_hash.to_vec(); let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); - let range = self.store.range(..last_key).rev(); + let range = self.store.range_rev(..last_key)?; self.read_range_aux(partition_hash, range, filter, limit) } }, } } - fn read_range_aux( + fn read_range_aux<'a>( &self, partition_hash: Hash, - range: impl Iterator>, + range: db::ValueIter<'a>, filter: &Option, limit: usize, ) -> Result>, Error> { @@ -139,7 +140,7 @@ where } }; if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + ret.push(Arc::new(ByteBuf::from(value))); } if ret.len() >= limit { break; @@ -183,12 +184,10 @@ where tree_key: &[u8], f: impl Fn(Option) -> F::E, ) -> Result, Error> { - let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? { + let changed = self.store.db().transaction(|mut tx| { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { Some(old_bytes) => { - let old_entry = self - .decode_entry(&old_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } @@ -204,24 +203,28 @@ where // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + .map_err(db::TxError::Abort)?; let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); + drop(old_bytes); if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?; - store.insert(tree_key.to_vec(), new_bytes)?; - Ok(Some((old_entry, new_entry, new_bytes_hash))) + tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?; + tx.insert(&self.store, tree_key, new_bytes)?; + + self.instance + .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; + + Ok(Some((new_entry, new_bytes_hash))) } else { Ok(None) } })?; - if let Some((old_entry, new_entry, new_bytes_hash)) = changed { + if let Some((new_entry, new_bytes_hash)) = changed { self.metrics.internal_update_counter.add(1); let is_tombstone = new_entry.is_tombstone(); - self.instance.updated(old_entry.as_ref(), Some(&new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { // We are only responsible for GC'ing this item if we are the @@ -244,22 +247,23 @@ where } pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); + let removed = self + .store + .db() + .transaction(|mut tx| match tx.get(&self.store, k)? { + Some(cur_v) if cur_v == v => { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + + let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; + self.instance.updated(&mut tx, Some(&old_entry), None)?; + Ok(true) } - } - Ok(false) - })?; + _ => Ok(false), + })?; if removed { self.metrics.internal_delete_counter.add(1); - - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); } Ok(removed) @@ -270,25 +274,26 @@ where k: &[u8], vhash: Hash, ) -> Result { - let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); + let removed = self + .store + .db() + .transaction(|mut tx| match tx.get(&self.store, k)? { + Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + + let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; + self.instance.updated(&mut tx, Some(&old_entry), None)?; + Ok(true) } - } - Ok(None) - })?; + _ => Ok(false), + })?; - if let Some(old_v) = removed { - let old_entry = self.decode_entry(&old_v[..])?; - self.instance.updated(Some(&old_entry), None); + if removed { + self.metrics.internal_delete_counter.add(1); self.merkle_todo_notify.notify_one(); - Ok(true) - } else { - Ok(false) } + Ok(removed) } // ---- Utility functions ---- @@ -315,7 +320,7 @@ where } } - pub fn gc_todo_len(&self) -> usize { - self.gc_todo.len() + pub fn gc_todo_len(&self) -> Result { + Ok(self.gc_todo.len()) } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 2a05b6ae..e7fbbcb0 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -12,9 +12,10 @@ use futures::select; use futures_util::future::*; use tokio::sync::watch; +use garage_db::counted_tree_hack::CountedTree; + use garage_util::data::*; use garage_util::error::*; -use garage_util::sled_counter::SledCountedTree; use garage_util::time::*; use garage_rpc::system::System; @@ -100,18 +101,16 @@ where async fn gc_loop_iter(&self) -> Result, Error> { let now = now_msec(); - let mut entries = vec![]; - let mut excluded = vec![]; - // List entries in the GC todo list // These entries are put there when a tombstone is inserted in the table // (see update_entry in data.rs) - for entry_kv in self.data.gc_todo.iter() { + let mut candidates = vec![]; + for entry_kv in self.data.gc_todo.iter()? { let (k, vhash) = entry_kv?; - let mut todo_entry = GcTodoEntry::parse(&k, &vhash); + let todo_entry = GcTodoEntry::parse(&k, &vhash); if todo_entry.deletion_time() > now { - if entries.is_empty() && excluded.is_empty() { + if candidates.is_empty() { // If the earliest entry in the todo list shouldn't yet be processed, // return a duration to wait in the loop return Ok(Some(Duration::from_millis( @@ -123,15 +122,23 @@ where } } - let vhash = Hash::try_from(&vhash[..]).unwrap(); + candidates.push(todo_entry); + if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE { + break; + } + } + let mut entries = vec![]; + let mut excluded = vec![]; + for mut todo_entry in candidates { // Check if the tombstone is still the current value of the entry. // If not, we don't actually want to GC it, and we will remove it // from the gc_todo table later (below). + let vhash = todo_entry.value_hash; todo_entry.value = self .data .store - .get(&k[..])? + .get(&todo_entry.key[..])? .filter(|v| blake2sum(&v[..]) == vhash) .map(|v| v.to_vec()); @@ -353,17 +360,17 @@ impl GcTodoEntry { } /// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree - pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self { + pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self { Self { - tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()), - key: sled_k[8..].to_vec(), - value_hash: Hash::try_from(sled_v).unwrap(), + tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()), + key: db_k[8..].to_vec(), + value_hash: Hash::try_from(db_v).unwrap(), value: None, } } /// Saves the GcTodoEntry in the gc_todo tree - pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { + pub(crate) fn save(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> { gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; Ok(()) } @@ -373,9 +380,9 @@ impl GcTodoEntry { /// This is usefull to remove a todo entry only under the condition /// that it has not changed since the time it was read, i.e. /// what we have to do is still the same - pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { - let _ = gc_todo_tree.compare_and_swap::<_, _, Vec>( - &self.todo_table_key()[..], + pub(crate) fn remove_if_equal(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> { + gc_todo_tree.compare_and_swap::<_, _, &[u8]>( + &self.todo_table_key(), Some(self.value_hash), None, )?; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 93bf7e47..7685b193 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -4,11 +4,10 @@ use std::time::Duration; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; -use sled::transaction::{ - ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, -}; use tokio::sync::watch; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; @@ -90,35 +89,35 @@ where async fn updater_loop(self: Arc, mut must_exit: watch::Receiver) { while !*must_exit.borrow() { - if let Some(x) = self.data.merkle_todo.iter().next() { - match x { - Ok((key, valhash)) => { - if let Err(e) = self.update_item(&key[..], &valhash[..]) { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - } - } - Err(e) => { - warn!( - "({}) Error while iterating on Merkle todo tree: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; + match self.updater_loop_iter() { + Ok(true) => (), + Ok(false) => { + select! { + _ = self.data.merkle_todo_notify.notified().fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } - } else { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, + Err(e) => { + warn!( + "({}) Error while updating Merkle tree item: {}", + F::TABLE_NAME, + e + ); + tokio::time::sleep(Duration::from_secs(10)).await; } } } } + fn updater_loop_iter(&self) -> Result { + if let Some((key, valhash)) = self.data.merkle_todo.first()? { + self.update_item(&key, &valhash)?; + Ok(true) + } else { + Ok(false) + } + } + fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { let khash = blake2sum(k); @@ -137,13 +136,16 @@ where }; self.data .merkle_tree - .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; + .db() + .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?; - let deleted = self - .data - .merkle_todo - .compare_and_swap::<_, _, Vec>(k, Some(vhash_by), None)? - .is_ok(); + let deleted = self.data.merkle_todo.db().transaction(|mut tx| { + let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by); + if remove { + tx.remove(&self.data.merkle_todo, k)?; + } + Ok(remove) + })?; if !deleted { debug!( @@ -157,12 +159,12 @@ where fn update_item_rec( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &[u8], khash: &Hash, key: &MerkleNodeKey, new_vhash: Option, - ) -> ConflictableTransactionResult, Error> { + ) -> db::TxResult, Error> { let i = key.prefix.len(); // Read node at current position (defined by the prefix stored in key) @@ -203,7 +205,7 @@ where } MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)), x @ MerkleNode::Leaf(_, _) => { - tx.remove(key_sub.encode())?; + tx.remove(&self.data.merkle_tree, key_sub.encode())?; Some(x) } } @@ -283,28 +285,27 @@ where fn read_node_txn( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &MerkleNodeKey, - ) -> ConflictableTransactionResult { - let ent = tx.get(k.encode())?; - MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort) + ) -> db::TxResult { + let ent = tx.get(&self.data.merkle_tree, k.encode())?; + MerkleNode::decode_opt(&ent).map_err(db::TxError::Abort) } fn put_node_txn( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &MerkleNodeKey, v: &MerkleNode, - ) -> ConflictableTransactionResult { + ) -> db::TxResult { trace!("Put Merkle node: {:?} => {:?}", k, v); if *v == MerkleNode::Empty { - tx.remove(k.encode())?; + tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v) - .map_err(|e| ConflictableTransactionError::Abort(e.into()))?; + let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); - tx.insert(k.encode(), vby)?; + tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) } } @@ -312,15 +313,15 @@ where // Access a node in the Merkle tree, used by the sync protocol pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result { let ent = self.data.merkle_tree.get(k.encode())?; - MerkleNode::decode_opt(ent) + MerkleNode::decode_opt(&ent) } - pub fn merkle_tree_len(&self) -> usize { - self.data.merkle_tree.len() + pub fn merkle_tree_len(&self) -> Result { + Ok(self.data.merkle_tree.len()?) } - pub fn todo_len(&self) -> usize { - self.data.merkle_todo.len() + pub fn todo_len(&self) -> Result { + Ok(self.data.merkle_todo.len()?) } } @@ -347,7 +348,7 @@ impl MerkleNodeKey { } impl MerkleNode { - fn decode_opt(ent: Option) -> Result { + fn decode_opt(ent: &Option) -> Result { match ent { None => Ok(MerkleNode::Empty), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 752a2a6d..3a1783e0 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -1,6 +1,7 @@ use opentelemetry::{global, metrics::*, KeyValue}; -use garage_util::sled_counter::SledCountedTree; +use garage_db as db; +use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct TableMetrics { @@ -19,21 +20,19 @@ pub struct TableMetrics { pub(crate) sync_items_received: Counter, } impl TableMetrics { - pub fn new( - table_name: &'static str, - merkle_todo: sled::Tree, - gc_todo: SledCountedTree, - ) -> Self { + pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self { let meter = global::meter(table_name); TableMetrics { _merkle_todo_len: meter .u64_value_observer( "table.merkle_updater_todo_queue_length", move |observer| { - observer.observe( - merkle_todo.len() as u64, - &[KeyValue::new("table_name", table_name)], - ) + if let Ok(v) = merkle_todo.len() { + observer.observe( + v as u64, + &[KeyValue::new("table_name", table_name)], + ); + } }, ) .with_description("Merkle tree updater TODO queue length") @@ -45,7 +44,7 @@ impl TableMetrics { observer.observe( gc_todo.len() as u64, &[KeyValue::new("table_name", table_name)], - ) + ); }, ) .with_description("Table garbage collector TODO queue length") diff --git a/src/table/schema.rs b/src/table/schema.rs index 37327037..74f57798 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; +use garage_db as db; use garage_util::data::*; use crate::crdt::Crdt; @@ -82,11 +83,19 @@ pub trait TableSchema: Send + Sync { None } - // Updated triggers some stuff downstream, but it is not supposed to block or fail, - // as the update itself is an unchangeable fact that will never go back - // due to CRDT logic. Typically errors in propagation of info should be logged - // to stderr. - fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {} + /// Actions triggered by data changing in a table. If such actions + /// include updates to the local database that should be applied + /// atomically with the item update itself, a db transaction is + /// provided on which these changes should be done. + /// This function can return a DB error but that's all. + fn updated( + &self, + _tx: &mut db::Transaction, + _old: Option<&Self::E>, + _new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + Ok(()) + } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; } diff --git a/src/table/sync.rs b/src/table/sync.rs index 08069ad0..4c83e991 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -258,9 +258,9 @@ where while !*must_exit.borrow() { let mut items = Vec::new(); - for item in self.data.store.range(begin.to_vec()..end.to_vec()) { + for item in self.data.store.range(begin.to_vec()..end.to_vec())? { let (key, value) = item?; - items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); + items.push((key.to_vec(), Arc::new(ByteBuf::from(value)))); if items.len() >= 1024 { break; @@ -603,8 +603,16 @@ impl SyncTodo { let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if data.store.range(begin..end).next().is_none() { - continue; + match data.store.range(begin..end) { + Ok(mut iter) => { + if iter.next().is_none() { + continue; + } + } + Err(e) => { + warn!("DB error in add_full_sync: {}", e); + continue; + } } } diff --git a/src/table/table.rs b/src/table/table.rs index 2a167604..3c211728 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -13,6 +13,8 @@ use opentelemetry::{ Context, }; +use garage_db as db; + use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -69,7 +71,7 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub fn new(instance: F, replication: R, system: Arc, db: &sled::Db) -> Arc { + pub fn new(instance: F, replication: R, system: Arc, db: &db::Db) -> Arc { let endpoint = system .netapp .endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME)); -- cgit v1.2.3 From 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 8 Jul 2022 13:30:26 +0200 Subject: Background task manager (#332) - [x] New background worker trait - [x] Adapt all current workers to use new API - [x] Command to list currently running workers, and whether they are active, idle, or dead - [x] Error reporting - Optimizations - [x] Merkle updater: several items per iteration - [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on - scrub: - [x] have only one worker with a channel to start/pause/cancel - [x] automatic scrub - [x] ability to view and change tranquility from CLI - [x] persistence of a few info - [ ] Testing Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332 Co-authored-by: Alex Co-committed-by: Alex --- src/table/gc.rs | 89 ++++++++++++++++------- src/table/merkle.rs | 87 ++++++++++++++--------- src/table/sync.rs | 198 ++++++++++++++++++++++++---------------------------- 3 files changed, 208 insertions(+), 166 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index e7fbbcb0..12218d97 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use futures::future::join_all; -use futures::select; -use futures_util::future::*; use tokio::sync::watch; use garage_db::counted_tree_hack::CountedTree; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -69,35 +68,11 @@ where gc.endpoint.set_handler(gc.clone()); - let gc1 = gc.clone(); - system.background.spawn_worker( - format!("GC loop for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver| gc1.gc_loop(must_exit), - ); + system.background.spawn_worker(GcWorker::new(gc.clone())); gc } - async fn gc_loop(self: Arc, mut must_exit: watch::Receiver) { - while !*must_exit.borrow() { - match self.gc_loop_iter().await { - Ok(None) => { - // Stuff was done, loop immediately - } - Ok(Some(wait_delay)) => { - // Nothing was done, wait specified delay. - select! { - _ = tokio::time::sleep(wait_delay).fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!("({}) Error doing GC: {}", F::TABLE_NAME, e); - } - } - } - } - async fn gc_loop_iter(&self) -> Result, Error> { let now = now_msec(); @@ -328,6 +303,66 @@ where } } +struct GcWorker +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + gc: Arc>, + wait_delay: Duration, +} + +impl GcWorker +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn new(gc: Arc>) -> Self { + Self { + gc, + wait_delay: Duration::from_secs(0), + } + } +} + +#[async_trait] +impl Worker for GcWorker +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} GC", F::TABLE_NAME) + } + + fn info(&self) -> Option { + let l = self.gc.data.gc_todo_len().unwrap_or(0); + if l > 0 { + Some(format!("{} items in queue", l)) + } else { + None + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + match self.gc.gc_loop_iter().await? { + None => Ok(WorkerState::Busy), + Some(delay) => { + self.wait_delay = delay; + Ok(WorkerState::Idle) + } + } + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + tokio::time::sleep(self.wait_delay).await; + WorkerState::Busy + } +} + /// An entry stored in the gc_todo Sled tree associated with the table /// Contains helper function for parsing, saving, and removing /// such entry in Sled diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 7685b193..a5c29723 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -1,14 +1,13 @@ use std::sync::Arc; use std::time::Duration; -use futures::select; -use futures_util::future::*; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use garage_db as db; -use garage_util::background::BackgroundRunner; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -78,43 +77,17 @@ where empty_node_hash, }); - let ret2 = ret.clone(); - background.spawn_worker( - format!("Merkle tree updater for {}", F::TABLE_NAME), - |must_exit: watch::Receiver| ret2.updater_loop(must_exit), - ); + background.spawn_worker(MerkleWorker(ret.clone())); ret } - async fn updater_loop(self: Arc, mut must_exit: watch::Receiver) { - while !*must_exit.borrow() { - match self.updater_loop_iter() { - Ok(true) => (), - Ok(false) => { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } - } - - fn updater_loop_iter(&self) -> Result { + fn updater_loop_iter(&self) -> Result { if let Some((key, valhash)) = self.data.merkle_todo.first()? { self.update_item(&key, &valhash)?; - Ok(true) + Ok(WorkerState::Busy) } else { - Ok(false) + Ok(WorkerState::Idle) } } @@ -325,6 +298,54 @@ where } } +struct MerkleWorker(Arc>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; + +#[async_trait] +impl Worker for MerkleWorker +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} Merkle tree updater", F::TABLE_NAME) + } + + fn info(&self) -> Option { + let l = self.0.todo_len().unwrap_or(0); + if l > 0 { + Some(format!("{} items in queue", l)) + } else { + None + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + let updater = self.0.clone(); + tokio::task::spawn_blocking(move || { + for _i in 0..100 { + let s = updater.updater_loop_iter(); + if !matches!(s, Ok(WorkerState::Busy)) { + return s; + } + } + Ok(WorkerState::Busy) + }) + .await + .unwrap() + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + tokio::time::sleep(Duration::from_secs(10)).await; + WorkerState::Busy + } +} + impl MerkleNodeKey { fn encode(&self) -> Vec { let mut ret = Vec::with_capacity(2 + self.prefix.len()); diff --git a/src/table/sync.rs b/src/table/sync.rs index 4c83e991..b3756a5e 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -1,17 +1,17 @@ use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; -use futures::select; -use futures_util::future::*; use futures_util::stream::*; use opentelemetry::KeyValue; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use tokio::select; use tokio::sync::{mpsc, watch}; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -34,7 +34,7 @@ pub struct TableSyncer data: Arc>, merkle: Arc>, - todo: Mutex, + add_full_sync_tx: mpsc::UnboundedSender<()>, endpoint: Arc>, } @@ -52,10 +52,6 @@ impl Rpc for SyncRpc { type Response = Result; } -struct SyncTodo { - todo: Vec, -} - #[derive(Debug, Clone)] struct TodoPartition { partition: Partition, @@ -80,118 +76,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let todo = SyncTodo { todo: vec![] }; + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); let syncer = Arc::new(Self { system: system.clone(), data, merkle, - todo: Mutex::new(todo), + add_full_sync_tx, endpoint, }); syncer.endpoint.set_handler(syncer.clone()); - let (busy_tx, busy_rx) = mpsc::unbounded_channel(); - - let s1 = syncer.clone(); - system.background.spawn_worker( - format!("table sync watcher for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver| s1.watcher_task(must_exit, busy_rx), - ); - - let s2 = syncer.clone(); - system.background.spawn_worker( - format!("table syncer for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver| s2.syncer_task(must_exit, busy_tx), - ); - - let s3 = syncer.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(20)).await; - s3.add_full_sync(); + system.background.spawn_worker(SyncWorker { + syncer: syncer.clone(), + ring_recv: system.ring.clone(), + ring: system.ring.borrow().clone(), + add_full_sync_rx, + todo: vec![], + next_full_sync: Instant::now() + Duration::from_secs(20), }); syncer } - async fn watcher_task( - self: Arc, - mut must_exit: watch::Receiver, - mut busy_rx: mpsc::UnboundedReceiver, - ) { - let mut prev_ring: Arc = self.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver> = self.system.ring.clone(); - let mut nothing_to_do_since = Some(Instant::now()); - - while !*must_exit.borrow() { - select! { - _ = ring_recv.changed().fuse() => { - let new_ring = ring_recv.borrow(); - if !Arc::ptr_eq(&new_ring, &prev_ring) { - debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - prev_ring = new_ring.clone(); - } - } - busy_opt = busy_rx.recv().fuse() => { - if let Some(busy) = busy_opt { - if busy { - nothing_to_do_since = None; - } else if nothing_to_do_since.is_none() { - nothing_to_do_since = Some(Instant::now()); - } - } - } - _ = must_exit.changed().fuse() => {}, - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { - if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { - nothing_to_do_since = None; - debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - } - } - } - } - } - pub fn add_full_sync(&self) { - self.todo - .lock() - .unwrap() - .add_full_sync(&self.data, &self.system); - } - - async fn syncer_task( - self: Arc, - mut must_exit: watch::Receiver, - busy_tx: mpsc::UnboundedSender, - ) { - while !*must_exit.borrow() { - let task = self.todo.lock().unwrap().pop_task(); - if let Some(partition) = task { - busy_tx.send(true).unwrap(); - let res = self - .clone() - .sync_partition(&partition, &mut must_exit) - .await; - if let Err(e) = res { - warn!( - "({}) Error while syncing {:?}: {}", - F::TABLE_NAME, - partition, - e - ); - } - } else { - busy_tx.send(false).unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - } + if self.add_full_sync_tx.send(()).is_err() { + error!("({}) Could not add full sync", F::TABLE_NAME); } } + // ---- + async fn sync_partition( - self: Arc, + self: &Arc, partition: &TodoPartition, must_exit: &mut watch::Receiver, ) -> Result<(), Error> { @@ -577,12 +495,22 @@ where } } -impl SyncTodo { - fn add_full_sync( - &mut self, - data: &TableData, - system: &System, - ) { +// -------- Sync Worker --------- + +struct SyncWorker { + syncer: Arc>, + ring_recv: watch::Receiver>, + ring: Arc, + add_full_sync_rx: mpsc::UnboundedReceiver<()>, + todo: Vec, + next_full_sync: Instant, +} + +impl SyncWorker { + fn add_full_sync(&mut self) { + let system = &self.syncer.system; + let data = &self.syncer.data; + let my_id = system.id; self.todo.clear(); @@ -623,6 +551,8 @@ impl SyncTodo { retain, }); } + + self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; } fn pop_task(&mut self) -> Option { @@ -641,6 +571,62 @@ impl SyncTodo { } } +#[async_trait] +impl Worker for SyncWorker { + fn name(&self) -> String { + format!("{} sync", F::TABLE_NAME) + } + + fn info(&self) -> Option { + let l = self.todo.len(); + if l > 0 { + Some(format!("{} partitions remaining", l)) + } else { + None + } + } + + async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result { + if let Some(partition) = self.pop_task() { + self.syncer.sync_partition(&partition, must_exit).await?; + Ok(WorkerState::Busy) + } else { + Ok(WorkerState::Idle) + } + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + select! { + s = self.add_full_sync_rx.recv() => { + if let Some(()) = s { + self.add_full_sync(); + } + }, + _ = self.ring_recv.changed() => { + let new_ring = self.ring_recv.borrow(); + if !Arc::ptr_eq(&new_ring, &self.ring) { + self.ring = new_ring.clone(); + drop(new_ring); + debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); + self.add_full_sync(); + } + }, + _ = tokio::time::sleep(self.next_full_sync - Instant::now()) => { + self.add_full_sync(); + } + } + match self.todo.is_empty() { + false => WorkerState::Busy, + true => WorkerState::Idle, + } + } +} + +// ---- UTIL ---- + fn hash_of(x: &T) -> Result { Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) } -- cgit v1.2.3 From 8e7e680afe39f48fe15f365c9ef3fee57596e119 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 15:20:00 +0200 Subject: First adaptation to WIP netapp with streaming body --- src/table/schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table') diff --git a/src/table/schema.rs b/src/table/schema.rs index 74f57798..f37e98d8 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -60,7 +60,7 @@ pub trait Entry: } /// Trait for the schema used in a table -pub trait TableSchema: Send + Sync { +pub trait TableSchema: Send + Sync + 'static { /// The name of the table in the database const TABLE_NAME: &'static str; -- cgit v1.2.3 From df094bd8075332bb765b8b44c9b19cf2485e9ca8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 16:30:44 +0200 Subject: Less strict timeouts --- src/table/gc.rs | 3 ++- src/table/sync.rs | 3 ++- src/table/table.rs | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index 12218d97..6cae9701 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -25,7 +25,8 @@ use crate::replication::*; use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; -const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); +// Same timeout as NEED_BLOCK_QUERY_TIMEOUT in block manager +const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(15); // GC delay for table entries: 1 day (24 hours) // (the delay before the entry is added in the GC todo list diff --git a/src/table/sync.rs b/src/table/sync.rs index b3756a5e..62b88a58 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -24,7 +24,8 @@ use crate::merkle::*; use crate::replication::*; use crate::*; -const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); +// Sync RPC can contain a lot of data, so have a 1min timeout +const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(60); // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); diff --git a/src/table/table.rs b/src/table/table.rs index 3c211728..51f3837f 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -31,7 +31,7 @@ use crate::schema::*; use crate::sync::*; use crate::util::*; -pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); +pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub struct Table { pub system: Arc, -- cgit v1.2.3 From 48ffaaadfc790142ed9556f5227913fa8c32d2ed Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 16:47:56 +0200 Subject: Bump versions to 0.8.0 (compatibility is broken already) --- src/table/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/table') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6de37cda..ae52e8d7 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_table" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -15,8 +15,8 @@ path = "lib.rs" [dependencies] garage_db = { version = "0.8.0", path = "../db" } -garage_rpc = { version = "0.7.0", path = "../rpc" } -garage_util = { version = "0.7.0", path = "../util" } +garage_rpc = { version = "0.8.0", path = "../rpc" } +garage_util = { version = "0.8.0", path = "../util" } opentelemetry = "0.17" -- cgit v1.2.3 From 44733474bb6c9021c92b59e5c349b4b7ef71409a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 16:01:55 +0200 Subject: Remove/change println! in server code (fix #358) --- src/table/table.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'src/table') diff --git a/src/table/table.rs b/src/table/table.rs index 51f3837f..8e801be6 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -113,7 +113,6 @@ where async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); - //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRpc::::Update(vec![e_enc]); -- cgit v1.2.3 From 56592e18538b379ccaaa7b7c1990a599ac83b191 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 19 Sep 2022 20:12:19 +0200 Subject: RPC performance changes - configurable ping timeout - single, much higher, configurable RPC timeout - no more concurrency semaphore --- src/table/gc.rs | 10 ++-------- src/table/sync.rs | 16 ++++------------ src/table/table.rs | 14 +++----------- 3 files changed, 9 insertions(+), 31 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index 6cae9701..83e7eeff 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -25,8 +25,6 @@ use crate::replication::*; use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; -// Same timeout as NEED_BLOCK_QUERY_TIMEOUT in block manager -const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(15); // GC delay for table entries: 1 day (24 hours) // (the delay before the entry is added in the GC todo list @@ -237,9 +235,7 @@ where &self.endpoint, &nodes[..], GcRpc::Update(updates), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(nodes.len()) - .with_timeout(TABLE_GC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) .await .err_context("GC: send tombstones")?; @@ -260,9 +256,7 @@ where &self.endpoint, &nodes[..], GcRpc::DeleteIfEqualHash(deletes), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(nodes.len()) - .with_timeout(TABLE_GC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) .await .err_context("GC: remote delete tombstones")?; diff --git a/src/table/sync.rs b/src/table/sync.rs index 62b88a58..76402d28 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -24,9 +24,6 @@ use crate::merkle::*; use crate::replication::*; use crate::*; -// Sync RPC can contain a lot of data, so have a 1min timeout -const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(60); - // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); @@ -248,9 +245,7 @@ where &self.endpoint, nodes, SyncRpc::Items(values), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(nodes.len()) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) .await?; @@ -311,8 +306,7 @@ where &self.endpoint, who, SyncRpc::RootCkHash(partition.partition, root_ck_hash), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await?; @@ -368,8 +362,7 @@ where &self.endpoint, who, SyncRpc::GetNode(key.clone()), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await? { @@ -445,8 +438,7 @@ where &self.endpoint, who, SyncRpc::Items(values), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await?; if let SyncRpc::Ok = rpc_resp { diff --git a/src/table/table.rs b/src/table/table.rs index 8e801be6..8a66c420 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,7 +1,6 @@ use std::borrow::Borrow; use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; use futures::stream::*; @@ -31,8 +30,6 @@ use crate::schema::*; use crate::sync::*; use crate::util::*; -pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(30); - pub struct Table { pub system: Arc, pub data: Arc>, @@ -124,8 +121,7 @@ where &who[..], rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.write_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT), + .with_quorum(self.data.replication.write_quorum()), ) .await?; @@ -177,7 +173,7 @@ where &self.endpoint, node, rpc, - RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_NORMAL), ) .await?; Ok::<_, Error>((node, resp)) @@ -234,7 +230,6 @@ where rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.read_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; @@ -329,7 +324,6 @@ where rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.read_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; @@ -406,9 +400,7 @@ where &self.endpoint, who, TableRpc::::Update(vec![what_enc]), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(who.len()) - .with_timeout(TABLE_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(who.len()), ) .await?; Ok(()) -- cgit v1.2.3 From 1f7b050b7dd975642c8cd5d8a7562d347cfa528d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 20 Sep 2022 11:49:48 +0200 Subject: Change a warn! into a debug! --- src/table/Cargo.toml | 1 + src/table/sync.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'src/table') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index ae52e8d7..38c6b41c 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -22,6 +22,7 @@ opentelemetry = "0.17" async-trait = "0.1.7" bytes = "1.0" +hex = "0.4" hexdump = "0.1" tracing = "0.1.30" rand = "0.8" diff --git a/src/table/sync.rs b/src/table/sync.rs index 62b88a58..28e99dd3 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -351,11 +351,11 @@ where // Just send that item directly if let Some(val) = self.data.store.get(&ik[..])? { if blake2sum(&val[..]) != ivhash { - warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik); + debug!("({}) Hashes differ between stored value and Merkle tree, key: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik)); } todo_items.push(val.to_vec()); } else { - warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik); + debug!("({}) Item from Merkle tree not found in store: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik)); } } MerkleNode::Intermediate(l) => { -- cgit v1.2.3 From ad917ffd3f76316e48b89ff17e2f8a600a269481 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 29 Sep 2022 15:53:54 +0200 Subject: Fix instant substractions that might have panicked --- src/table/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index e34aa8d7..9d79d856 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -607,7 +607,7 @@ impl Worker for SyncWor self.add_full_sync(); } }, - _ = tokio::time::sleep(self.next_full_sync - Instant::now()) => { + _ = tokio::time::sleep_until(self.next_full_sync.into()) => { self.add_full_sync(); } } -- cgit v1.2.3