From 5768bf362262f78376af14517c4921941986192e Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 10 May 2022 13:16:57 +0200 Subject: First implementation of K2V (#293) **Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex Co-committed-by: Alex --- src/model/k2v/item_table.rs | 291 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 291 insertions(+) create mode 100644 src/model/k2v/item_table.rs (limited to 'src/model/k2v/item_table.rs') diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs new file mode 100644 index 00000000..8b7cc08a --- /dev/null +++ b/src/model/k2v/item_table.rs @@ -0,0 +1,291 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::sync::Arc; + +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::*; + +use crate::index_counter::*; +use crate::k2v::causality::*; +use crate::k2v::counter_table::*; +use crate::k2v::poll::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct K2VItem { + pub partition: K2VItemPartition, + pub sort_key: String, + + items: BTreeMap, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)] +pub struct K2VItemPartition { + pub bucket_id: Uuid, + pub partition_key: String, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +struct DvvsEntry { + t_discard: u64, + values: Vec<(u64, DvvsValue)>, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum DvvsValue { + Value(#[serde(with = "serde_bytes")] Vec), + Deleted, +} + +impl K2VItem { + /// Creates a new K2VItem when no previous entry existed in the db + pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self { + Self { + partition: K2VItemPartition { + bucket_id, + partition_key, + }, + sort_key, + items: BTreeMap::new(), + } + } + /// Updates a K2VItem with a new value or a deletion event + pub fn update( + &mut self, + this_node: Uuid, + context: &Option, + new_value: DvvsValue, + ) { + if let Some(context) = context { + for (node, t_discard) in context.vector_clock.iter() { + if let Some(e) = self.items.get_mut(node) { + e.t_discard = std::cmp::max(e.t_discard, *t_discard); + } else { + self.items.insert( + *node, + DvvsEntry { + t_discard: *t_discard, + values: vec![], + }, + ); + } + } + } + + self.discard(); + + let node_id = make_node_id(this_node); + let e = self.items.entry(node_id).or_insert(DvvsEntry { + t_discard: 0, + values: vec![], + }); + let t_prev = e.max_time(); + e.values.push((t_prev + 1, new_value)); + } + + /// Extract the causality context of a K2V Item + pub fn causal_context(&self) -> CausalContext { + let mut cc = CausalContext::new_empty(); + for (node, ent) in self.items.iter() { + cc.vector_clock.insert(*node, ent.max_time()); + } + cc + } + + /// Extract the list of values + pub fn values(&'_ self) -> Vec<&'_ DvvsValue> { + let mut ret = vec![]; + for (_, ent) in self.items.iter() { + for (_, v) in ent.values.iter() { + if !ret.contains(&v) { + ret.push(v); + } + } + } + ret + } + + fn discard(&mut self) { + for (_, ent) in self.items.iter_mut() { + ent.discard(); + } + } + + // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used) + fn stats(&self) -> (i64, i64, i64, i64) { + let values = self.values(); + + let n_entries = if self.is_tombstone() { 0 } else { 1 }; + let n_conflicts = if values.len() > 1 { 1 } else { 0 }; + let n_values = values + .iter() + .filter(|v| matches!(v, DvvsValue::Value(_))) + .count() as i64; + let n_bytes = values + .iter() + .map(|v| match v { + DvvsValue::Deleted => 0, + DvvsValue::Value(v) => v.len() as i64, + }) + .sum(); + + (n_entries, n_conflicts, n_values, n_bytes) + } +} + +impl DvvsEntry { + fn max_time(&self) -> u64 { + self.values + .iter() + .fold(self.t_discard, |acc, (vts, _)| std::cmp::max(acc, *vts)) + } + + fn discard(&mut self) { + self.values = std::mem::take(&mut self.values) + .into_iter() + .filter(|(t, _)| *t > self.t_discard) + .collect::>(); + } +} + +impl Crdt for K2VItem { + fn merge(&mut self, other: &Self) { + for (node, e2) in other.items.iter() { + if let Some(e) = self.items.get_mut(node) { + e.merge(e2); + } else { + self.items.insert(*node, e2.clone()); + } + } + } +} + +impl Crdt for DvvsEntry { + fn merge(&mut self, other: &Self) { + self.t_discard = std::cmp::max(self.t_discard, other.t_discard); + self.discard(); + + let t_max = self.max_time(); + for (vt, vv) in other.values.iter() { + if *vt > t_max { + self.values.push((*vt, vv.clone())); + } + } + } +} + +impl PartitionKey for K2VItemPartition { + fn hash(&self) -> Hash { + use blake2::{Blake2b, Digest}; + + let mut hasher = Blake2b::new(); + hasher.update(self.bucket_id.as_slice()); + hasher.update(self.partition_key.as_bytes()); + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hasher.finalize()[..32]); + hash.into() + } +} + +impl Entry for K2VItem { + fn partition_key(&self) -> &K2VItemPartition { + &self.partition + } + fn sort_key(&self) -> &String { + &self.sort_key + } + fn is_tombstone(&self) -> bool { + self.values() + .iter() + .all(|v| matches!(v, DvvsValue::Deleted)) + } +} + +pub struct K2VItemTable { + pub(crate) counter_table: Arc>, + pub(crate) subscriptions: Arc, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct ItemFilter { + pub exclude_only_tombstones: bool, + pub conflicts_only: bool, +} + +impl TableSchema for K2VItemTable { + const TABLE_NAME: &'static str = "k2v_item"; + + type P = K2VItemPartition; + type S = String; + type E = K2VItem; + type Filter = ItemFilter; + + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + // 1. Count + let (old_entries, old_conflicts, old_values, old_bytes) = match old { + None => (0, 0, 0, 0), + Some(e) => e.stats(), + }; + let (new_entries, new_conflicts, new_values, new_bytes) = match new { + None => (0, 0, 0, 0), + Some(e) => e.stats(), + }; + + let count_pk = old + .map(|e| e.partition.bucket_id) + .unwrap_or_else(|| new.unwrap().partition.bucket_id); + let count_sk = old + .map(|e| &e.partition.partition_key) + .unwrap_or_else(|| &new.unwrap().partition.partition_key); + + if let Err(e) = self.counter_table.count( + &count_pk, + count_sk, + &[ + (ENTRIES, new_entries - old_entries), + (CONFLICTS, new_conflicts - old_conflicts), + (VALUES, new_values - old_values), + (BYTES, new_bytes - old_bytes), + ], + ) { + error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); + } + + // 2. Notify + if let Some(new_ent) = new { + self.subscriptions.notify(new_ent); + } + } + + #[allow(clippy::nonminimal_bool)] + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + let v = entry.values(); + !(filter.conflicts_only && v.len() < 2) + && !(filter.exclude_only_tombstones && entry.is_tombstone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dvvsentry_merge_simple() { + let e1 = DvvsEntry { + t_discard: 4, + values: vec![ + (5, DvvsValue::Value(vec![15])), + (6, DvvsValue::Value(vec![16])), + ], + }; + let e2 = DvvsEntry { + t_discard: 5, + values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)], + }; + + let mut e3 = e1.clone(); + e3.merge(&e2); + assert_eq!(e2, e3); + } +} -- cgit v1.2.3 From b44d3fc796484a50cd6854f20c9b46e5fddedc9d Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 8 Jun 2022 10:01:44 +0200 Subject: Abstract database behind generic interface and implement alternative drivers (#322) - [x] Design interface - [x] Implement Sled backend - [x] Re-implement the SledCountedTree hack ~~on Sled backend~~ on all backends (i.e. over the abstraction) - [x] Convert Garage code to use generic interface - [x] Proof-read converted Garage code - [ ] Test everything well - [x] Implement sqlite backend - [x] Implement LMDB backend - [ ] (Implement Persy backend?) - [ ] (Implement other backends? (like RocksDB, ...)) - [x] Implement backend choice in config file and garage server module - [x] Add CLI for converting between DB formats - Exploit the new interface to put more things in transactions - [x] `.updated()` trigger on Garage tables Fix #284 **Bugs** - [x] When exporting sqlite, trees iterate empty?? - [x] LMDB doesn't work **Known issues for various back-ends** - Sled: - Eats all my RAM and also all my disk space - `.len()` has to traverse the whole table - Is actually quite slow on some operations - And is actually pretty bad code... - Sqlite: - Requires a lock to be taken on all operations. The lock is also taken when iterating on a table with `.iter()`, and the lock isn't released until the iterator is dropped. This means that we must be VERY carefull to not do anything else inside a `.iter()` loop or else we will have a deadlock! Most such cases have been eliminated from the Garage codebase, but there might still be some that remain. If your Garage-over-Sqlite seems to hang/freeze, this is the reason. - (adapter uses a bunch of unsafe code) - Heed (LMDB): - Not suited for 32-bit machines as it has to map the whole DB in memory. - (adpater uses a tiny bit of unsafe code) **My recommendation:** avoid 32-bit machines and use LMDB as much as possible. **Converting databases** is actually quite easy. For example from Sled to LMDB: ```bash cd src/db cargo run --features cli --bin convert -- -i path/to/garage/meta/db -a sled -o path/to/garage/meta/db.lmdb -b lmdb ``` Then, just add this to your `config.toml`: ```toml db_engine = "lmdb" ``` Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/322 Co-authored-by: Alex Co-committed-by: Alex --- src/model/k2v/item_table.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) (limited to 'src/model/k2v/item_table.rs') diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 8b7cc08a..991fe66d 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; use garage_util::data::*; use garage_table::crdt::*; @@ -221,7 +222,12 @@ impl TableSchema for K2VItemTable { type E = K2VItem; type Filter = ItemFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { // 1. Count let (old_entries, old_conflicts, old_values, old_bytes) = match old { None => (0, 0, 0, 0), @@ -239,7 +245,8 @@ impl TableSchema for K2VItemTable { .map(|e| &e.partition.partition_key) .unwrap_or_else(|| &new.unwrap().partition.partition_key); - if let Err(e) = self.counter_table.count( + let counter_res = self.counter_table.count( + tx, &count_pk, count_sk, &[ @@ -248,14 +255,23 @@ impl TableSchema for K2VItemTable { (VALUES, new_values - old_values), (BYTES, new_bytes - old_bytes), ], - ) { - error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); + ); + if let Err(e) = db::unabort(counter_res)? { + // This result can be returned by `counter_table.count()` for instance + // if messagepack serialization or deserialization fails at some step. + // Warn admin but ignore this error for now, that's all we can do. + error!( + "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", + count_pk, count_sk, e + ); } // 2. Notify if let Some(new_ent) = new { self.subscriptions.notify(new_ent); } + + Ok(()) } #[allow(clippy::nonminimal_bool)] -- cgit v1.2.3 From 77e3fd6db2c9cd3a10889bd071e95ef839cfbefc Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 15 Jun 2022 20:20:28 +0200 Subject: improve internal item counter mechanisms and implement bucket quotas (#326) - [x] Refactoring of internal counting API - [x] Repair procedure for counters (it's an offline procedure!!!) - [x] New counter for objects in buckets - [x] Add quotas to buckets struct - [x] Add CLI to manage bucket quotas - [x] Add admin API to manage bucket quotas - [x] Apply quotas by adding checks on put operations - [x] Proof-read Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/326 Co-authored-by: Alex Co-committed-by: Alex --- src/model/k2v/item_table.rs | 102 ++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 52 deletions(-) (limited to 'src/model/k2v/item_table.rs') diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 991fe66d..baa1db4b 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -10,9 +10,13 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::counter_table::*; use crate::k2v::poll::*; +pub const ENTRIES: &str = "entries"; +pub const CONFLICTS: &str = "conflicts"; +pub const VALUES: &str = "values"; +pub const BYTES: &str = "bytes"; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { pub partition: K2VItemPartition, @@ -112,27 +116,6 @@ impl K2VItem { ent.discard(); } } - - // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used) - fn stats(&self) -> (i64, i64, i64, i64) { - let values = self.values(); - - let n_entries = if self.is_tombstone() { 0 } else { 1 }; - let n_conflicts = if values.len() > 1 { 1 } else { 0 }; - let n_values = values - .iter() - .filter(|v| matches!(v, DvvsValue::Value(_))) - .count() as i64; - let n_bytes = values - .iter() - .map(|v| match v { - DvvsValue::Deleted => 0, - DvvsValue::Value(v) => v.len() as i64, - }) - .sum(); - - (n_entries, n_conflicts, n_values, n_bytes) - } } impl DvvsEntry { @@ -204,7 +187,7 @@ impl Entry for K2VItem { } pub struct K2VItemTable { - pub(crate) counter_table: Arc>, + pub(crate) counter_table: Arc>, pub(crate) subscriptions: Arc, } @@ -229,40 +212,14 @@ impl TableSchema for K2VItemTable { new: Option<&Self::E>, ) -> db::TxOpResult<()> { // 1. Count - let (old_entries, old_conflicts, old_values, old_bytes) = match old { - None => (0, 0, 0, 0), - Some(e) => e.stats(), - }; - let (new_entries, new_conflicts, new_values, new_bytes) = match new { - None => (0, 0, 0, 0), - Some(e) => e.stats(), - }; - - let count_pk = old - .map(|e| e.partition.bucket_id) - .unwrap_or_else(|| new.unwrap().partition.bucket_id); - let count_sk = old - .map(|e| &e.partition.partition_key) - .unwrap_or_else(|| &new.unwrap().partition.partition_key); - - let counter_res = self.counter_table.count( - tx, - &count_pk, - count_sk, - &[ - (ENTRIES, new_entries - old_entries), - (CONFLICTS, new_conflicts - old_conflicts), - (VALUES, new_values - old_values), - (BYTES, new_bytes - old_bytes), - ], - ); + let counter_res = self.counter_table.count(tx, old, new); if let Err(e) = db::unabort(counter_res)? { // This result can be returned by `counter_table.count()` for instance // if messagepack serialization or deserialization fails at some step. // Warn admin but ignore this error for now, that's all we can do. error!( - "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", - count_pk, count_sk, e + "Unable to update K2V item counter: {}. Index values will be wrong!", + e ); } @@ -282,6 +239,47 @@ impl TableSchema for K2VItemTable { } } +impl CountedItem for K2VItem { + const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter_v2"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = K2V item's partition key + type CS = String; + + fn counter_partition_key(&self) -> &Uuid { + &self.partition.bucket_id + } + fn counter_sort_key(&self) -> &String { + &self.partition.partition_key + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let values = self.values(); + + let n_entries = if self.is_tombstone() { 0 } else { 1 }; + let n_conflicts = if values.len() > 1 { 1 } else { 0 }; + let n_values = values + .iter() + .filter(|v| matches!(v, DvvsValue::Value(_))) + .count() as i64; + let n_bytes = values + .iter() + .map(|v| match v { + DvvsValue::Deleted => 0, + DvvsValue::Value(v) => v.len() as i64, + }) + .sum(); + + vec![ + (ENTRIES, n_entries), + (CONFLICTS, n_conflicts), + (VALUES, n_values), + (BYTES, n_bytes), + ] + } +} + #[cfg(test)] mod tests { use super::*; -- cgit v1.2.3 From ded444f6c96f8ab991e762f65760b42e4d64246c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 20 Sep 2022 16:01:41 +0200 Subject: Ability to have custom timeouts in request strategy (not used) --- src/model/k2v/item_table.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src/model/k2v/item_table.rs') diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index baa1db4b..7860cb17 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -17,7 +17,7 @@ pub const CONFLICTS: &str = "conflicts"; pub const VALUES: &str = "values"; pub const BYTES: &str = "bytes"; -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { pub partition: K2VItemPartition, pub sort_key: String, @@ -25,19 +25,19 @@ pub struct K2VItem { items: BTreeMap, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] pub struct K2VItemPartition { pub bucket_id: Uuid, pub partition_key: String, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] struct DvvsEntry { t_discard: u64, values: Vec<(u64, DvvsValue)>, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum DvvsValue { Value(#[serde(with = "serde_bytes")] Vec), Deleted, -- cgit v1.2.3