diff options
-rw-r--r-- | src/api/k2v/index.rs | 2 | ||||
-rw-r--r-- | src/model/garage.rs | 4 | ||||
-rw-r--r-- | src/model/index_counter.rs | 85 | ||||
-rw-r--r-- | src/model/k2v/counter_table.rs | 20 | ||||
-rw-r--r-- | src/model/k2v/item_table.rs | 102 | ||||
-rw-r--r-- | src/model/k2v/mod.rs | 1 |
6 files changed, 106 insertions, 108 deletions
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index d5db906d..210950bf 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -10,7 +10,7 @@ use garage_rpc::ring::Ring; use garage_table::util::*; use garage_model::garage::Garage; -use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; +use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; use crate::k2v::error::*; use crate::k2v::range::read_range; diff --git a/src/model/garage.rs b/src/model/garage.rs index 280f3dc7..106ff858 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -27,7 +27,7 @@ use crate::key_table::*; #[cfg(feature = "k2v")] use crate::index_counter::*; #[cfg(feature = "k2v")] -use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*}; +use crate::k2v::{item_table::*, poll::*, rpc::*}; /// An entire Garage full of data pub struct Garage { @@ -66,7 +66,7 @@ pub struct GarageK2V { /// Table containing K2V items pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, /// Indexing table containing K2V item counters - pub counter_table: Arc<IndexCounter<K2VCounterTable>>, + pub counter_table: Arc<IndexCounter<K2VItem>>, /// K2V RPC handler pub rpc: Arc<K2VRpcHandler>, } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 2602d5d9..109b9828 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -17,25 +17,30 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; -pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static { - const NAME: &'static str; - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; - type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; +pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { + const COUNTER_TABLE_NAME: &'static str; + + type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + + fn counter_partition_key(&self) -> &Self::CP; + fn counter_sort_key(&self) -> &Self::CS; + fn counts(&self) -> Vec<(&'static str, i64)>; } /// A counter entry in the global table -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct CounterEntry<T: CounterSchema> { - pub pk: T::P, - pub sk: T::S, +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct CounterEntry<T: CountedItem> { + pub pk: T::CP, + pub sk: T::CS, pub values: BTreeMap<String, CounterValue>, } -impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> { - fn partition_key(&self) -> &T::P { +impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { + fn partition_key(&self) -> &T::CP { &self.pk } - fn sort_key(&self) -> &T::S { + fn sort_key(&self) -> &T::CS { &self.sk } fn is_tombstone(&self) -> bool { @@ -45,7 +50,7 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> { } } -impl<T: CounterSchema> CounterEntry<T> { +impl<T: CountedItem> CounterEntry<T> { pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> { let nodes = &ring.layout.node_id_vec[..]; self.filtered_values_with_nodes(nodes) @@ -78,7 +83,7 @@ pub struct CounterValue { pub node_values: BTreeMap<Uuid, (u64, i64)>, } -impl<T: CounterSchema> Crdt for CounterEntry<T> { +impl<T: CountedItem> Crdt for CounterEntry<T> { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { if let Some(e) = self.values.get_mut(name) { @@ -104,15 +109,15 @@ impl Crdt for CounterValue { } } -pub struct CounterTable<T: CounterSchema> { +pub struct CounterTable<T: CountedItem> { _phantom_t: PhantomData<T>, } -impl<T: CounterSchema> TableSchema for CounterTable<T> { - const TABLE_NAME: &'static str = T::NAME; +impl<T: CountedItem> TableSchema for CounterTable<T> { + const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME; - type P = T::P; - type S = T::S; + type P = T::CP; + type S = T::CS; type E = CounterEntry<T>; type Filter = (DeletedFilter, Vec<Uuid>); @@ -131,14 +136,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { // ---- -pub struct IndexCounter<T: CounterSchema> { +pub struct IndexCounter<T: CountedItem> { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, + propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } -impl<T: CounterSchema> IndexCounter<T> { +impl<T: CountedItem> IndexCounter<T> { pub fn new( system: Arc<System>, replication: TableShardedReplication, @@ -151,7 +156,7 @@ impl<T: CounterSchema> IndexCounter<T> { let this = Arc::new(Self { this_node: system.id, local_counter: db - .open_tree(format!("local_counter:{}", T::NAME)) + .open_tree(format!("local_counter:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), propagate_tx, table: Table::new( @@ -166,7 +171,7 @@ impl<T: CounterSchema> IndexCounter<T> { let this2 = this.clone(); background.spawn_worker( - format!("{} index counter propagator", T::NAME), + format!("{} index counter propagator", T::COUNTER_TABLE_NAME), move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), ); this @@ -175,10 +180,26 @@ impl<T: CounterSchema> IndexCounter<T> { pub fn count( &self, tx: &mut db::Transaction, - pk: &T::P, - sk: &T::S, - counts: &[(&str, i64)], + old: Option<&T>, + new: Option<&T>, ) -> db::TxResult<(), Error> { + let pk = old + .map(|e| e.counter_partition_key()) + .unwrap_or_else(|| new.unwrap().counter_partition_key()); + let sk = old + .map(|e| e.counter_sort_key()) + .unwrap_or_else(|| new.unwrap().counter_sort_key()); + + // calculate counter differences + let mut counts = HashMap::new(); + for (k, v) in old.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) -= v; + } + for (k, v) in new.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) += v; + } + + // update local counter table let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { @@ -213,7 +234,7 @@ impl<T: CounterSchema> IndexCounter<T> { async fn propagate_loop( self: Arc<Self>, - mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>, + mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, must_exit: watch::Receiver<bool>, ) { // This loop batches updates to counters to be sent all at once. @@ -255,10 +276,10 @@ impl<T: CounterSchema> IndexCounter<T> { if let Err(e) = self.table.insert_many(entries).await { errors += 1; if errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e); break; } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors); + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors); tokio::time::sleep(Duration::from_secs(5)).await; continue; } @@ -280,11 +301,11 @@ struct LocalCounterEntry { } impl LocalCounterEntry { - fn into_counter_entry<T: CounterSchema>( + fn into_counter_entry<T: CountedItem>( self, this_node: Uuid, - pk: T::P, - sk: T::S, + pk: T::CP, + sk: T::CS, ) -> CounterEntry<T> { CounterEntry { pk, diff --git a/src/model/k2v/counter_table.rs b/src/model/k2v/counter_table.rs deleted file mode 100644 index 4856eb2b..00000000 --- a/src/model/k2v/counter_table.rs +++ /dev/null @@ -1,20 +0,0 @@ -use garage_util::data::*; - -use crate::index_counter::*; - -pub const ENTRIES: &str = "entries"; -pub const CONFLICTS: &str = "conflicts"; -pub const VALUES: &str = "values"; -pub const BYTES: &str = "bytes"; - -#[derive(PartialEq, Clone)] -pub struct K2VCounterTable; - -impl CounterSchema for K2VCounterTable { - const NAME: &'static str = "k2v_index_counter"; - - // Partition key = bucket id - type P = Uuid; - // Sort key = K2V item's partition key - type S = String; -} diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 991fe66d..d02bdd26 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -10,9 +10,13 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::counter_table::*; use crate::k2v::poll::*; +pub const ENTRIES: &str = "entries"; +pub const CONFLICTS: &str = "conflicts"; +pub const VALUES: &str = "values"; +pub const BYTES: &str = "bytes"; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { pub partition: K2VItemPartition, @@ -112,27 +116,6 @@ impl K2VItem { ent.discard(); } } - - // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used) - fn stats(&self) -> (i64, i64, i64, i64) { - let values = self.values(); - - let n_entries = if self.is_tombstone() { 0 } else { 1 }; - let n_conflicts = if values.len() > 1 { 1 } else { 0 }; - let n_values = values - .iter() - .filter(|v| matches!(v, DvvsValue::Value(_))) - .count() as i64; - let n_bytes = values - .iter() - .map(|v| match v { - DvvsValue::Deleted => 0, - DvvsValue::Value(v) => v.len() as i64, - }) - .sum(); - - (n_entries, n_conflicts, n_values, n_bytes) - } } impl DvvsEntry { @@ -204,7 +187,7 @@ impl Entry<K2VItemPartition, String> for K2VItem { } pub struct K2VItemTable { - pub(crate) counter_table: Arc<IndexCounter<K2VCounterTable>>, + pub(crate) counter_table: Arc<IndexCounter<K2VItem>>, pub(crate) subscriptions: Arc<SubscriptionManager>, } @@ -229,40 +212,14 @@ impl TableSchema for K2VItemTable { new: Option<&Self::E>, ) -> db::TxOpResult<()> { // 1. Count - let (old_entries, old_conflicts, old_values, old_bytes) = match old { - None => (0, 0, 0, 0), - Some(e) => e.stats(), - }; - let (new_entries, new_conflicts, new_values, new_bytes) = match new { - None => (0, 0, 0, 0), - Some(e) => e.stats(), - }; - - let count_pk = old - .map(|e| e.partition.bucket_id) - .unwrap_or_else(|| new.unwrap().partition.bucket_id); - let count_sk = old - .map(|e| &e.partition.partition_key) - .unwrap_or_else(|| &new.unwrap().partition.partition_key); - - let counter_res = self.counter_table.count( - tx, - &count_pk, - count_sk, - &[ - (ENTRIES, new_entries - old_entries), - (CONFLICTS, new_conflicts - old_conflicts), - (VALUES, new_values - old_values), - (BYTES, new_bytes - old_bytes), - ], - ); + let counter_res = self.counter_table.count(tx, old, new); if let Err(e) = db::unabort(counter_res)? { // This result can be returned by `counter_table.count()` for instance // if messagepack serialization or deserialization fails at some step. // Warn admin but ignore this error for now, that's all we can do. error!( - "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", - count_pk, count_sk, e + "Unable to update K2V item counter: {}. Index values will be wrong!", + e ); } @@ -282,6 +239,47 @@ impl TableSchema for K2VItemTable { } } +impl CountedItem for K2VItem { + const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = K2V item's partition key + type CS = String; + + fn counter_partition_key(&self) -> &Uuid { + &self.partition.bucket_id + } + fn counter_sort_key(&self) -> &String { + &self.partition.partition_key + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let values = self.values(); + + let n_entries = if self.is_tombstone() { 0 } else { 1 }; + let n_conflicts = if values.len() > 1 { 1 } else { 0 }; + let n_values = values + .iter() + .filter(|v| matches!(v, DvvsValue::Value(_))) + .count() as i64; + let n_bytes = values + .iter() + .map(|v| match v { + DvvsValue::Deleted => 0, + DvvsValue::Value(v) => v.len() as i64, + }) + .sum(); + + vec![ + (ENTRIES, n_entries), + (CONFLICTS, n_conflicts), + (VALUES, n_values), + (BYTES, n_bytes), + ] + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index 664172a6..f6a96151 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,6 +1,5 @@ pub mod causality; -pub mod counter_table; pub mod item_table; pub mod poll; |