From 32715d462e44d9b6dee84ff3c1eb6163d4be4123 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 9 Jan 2023 14:53:27 +0100 Subject: history table refactoring --- src/model/garage.rs | 2 +- src/model/k2v/history_table.rs | 20 +++---- src/model/k2v/item_table.rs | 2 +- src/model/k2v/mod.rs | 3 +- src/model/k2v/poll.rs | 115 ----------------------------------------- src/model/k2v/rpc.rs | 11 ++-- src/model/k2v/sub.rs | 115 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 134 insertions(+), 134 deletions(-) delete mode 100644 src/model/k2v/poll.rs create mode 100644 src/model/k2v/sub.rs (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index a33265af..c0ffdd31 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -27,7 +27,7 @@ use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::k2v::{history_table::*, item_table::*, poll::*, rpc::*}; +use crate::k2v::{history_table::*, item_table::*, sub::*, rpc::*}; /// An entire Garage full of data pub struct Garage { diff --git a/src/model/k2v/history_table.rs b/src/model/k2v/history_table.rs index 6a6e9a10..9df03f5d 100644 --- a/src/model/k2v/history_table.rs +++ b/src/model/k2v/history_table.rs @@ -5,18 +5,21 @@ use garage_db as db; use garage_table::crdt::*; use garage_table::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; mod v08 { use crate::k2v::causality::K2VNodeId; - pub use crate::k2v::item_table::v08::{DvvsValue, K2VItemPartition}; + pub use crate::k2v::item_table::v08::{DvvsValue, K2VItem, K2VItemPartition}; use garage_util::crdt; use serde::{Deserialize, Serialize}; #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct K2VHistoryEntry { - /// Partition key: a K2V partition - pub partition: K2VItemPartition, + // Partition key: the partition key of ins_item + + /// The inserted item + pub ins_item: K2VItem, + /// Sort key: the node ID and its local counter pub node_counter: K2VHistorySortKey, @@ -25,12 +28,8 @@ mod v08 { /// The timesamp of the update (!= counter, counters are incremented /// by one, timestamps are real clock timestamps) pub timestamp: u64, - /// The sort key of the item that was inserted - pub ins_sort_key: String, - /// The inserted value - pub ins_value: DvvsValue, - /// Whether this history entry is too old and should be deleted + /// Mark this history entry for deletion pub deleted: crdt::Bool, } @@ -49,6 +48,7 @@ pub use v08::*; impl Crdt for K2VHistoryEntry { fn merge(&mut self, other: &Self) { + self.ins_item.merge(&other.ins_item); self.deleted.merge(&other.deleted); } } @@ -66,7 +66,7 @@ impl SortKey for K2VHistorySortKey { impl Entry for K2VHistoryEntry { fn partition_key(&self) -> &K2VItemPartition { - &self.partition + &self.ins_item.partition } fn sort_key(&self) -> &K2VHistorySortKey { &self.node_counter diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 90a2f4d0..cf60fd3f 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -11,7 +11,7 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; pub const ENTRIES: &str = "entries"; pub const CONFLICTS: &str = "conflicts"; diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index 18deabac..4f7de5b7 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -3,5 +3,6 @@ pub mod causality; pub mod history_table; pub mod item_table; -pub mod poll; +pub(crate) mod sub; + pub mod rpc; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs deleted file mode 100644 index ea3e8d41..00000000 --- a/src/model/k2v/poll.rs +++ /dev/null @@ -1,115 +0,0 @@ -use std::collections::HashMap; -use std::sync::Mutex; - -use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast; - -use crate::k2v::history_table::*; -use crate::k2v::item_table::*; - -#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PollKey { - pub partition: K2VItemPartition, - pub sort_key: String, -} - -#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PollRange { - pub partition: K2VItemPartition, - pub prefix: Option, - pub start: Option, - pub end: Option, -} - -#[derive(Default)] -pub struct SubscriptionManager { - item_subscriptions: Mutex>>, - range_subscriptions: Mutex>>, -} - -impl SubscriptionManager { - pub fn new() -> Self { - Self::default() - } - - // ---- simple item polling ---- - - pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { - let mut subs = self.item_subscriptions.lock().unwrap(); - if let Some(s) = subs.get(key) { - s.subscribe() - } else { - let (tx, rx) = broadcast::channel(8); - subs.insert(key.clone(), tx); - rx - } - } - - pub fn notify_item(&self, item: &K2VItem) { - let key = PollKey { - partition: item.partition.clone(), - sort_key: item.sort_key.clone(), - }; - let mut subs = self.item_subscriptions.lock().unwrap(); - if let Some(s) = subs.get(&key) { - if s.send(item.clone()).is_err() { - // no more subscribers, remove channel from here - // (we will re-create it later if we need to subscribe again) - subs.remove(&key); - } - } - } - - // ---- range polling ---- - - pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver { - let mut subs = self.range_subscriptions.lock().unwrap(); - if let Some(s) = subs.get(key) { - s.subscribe() - } else { - let (tx, rx) = broadcast::channel(8); - subs.insert(key.clone(), tx); - rx - } - } - - pub fn notify_range(&self, entry: &K2VHistoryEntry) { - let mut subs = self.range_subscriptions.lock().unwrap(); - let mut dead_subs = vec![]; - - for (sub, chan) in subs.iter() { - if sub.matches(&entry) { - if chan.send(entry.clone()).is_err() { - dead_subs.push(sub.clone()); - } - } else if chan.receiver_count() == 0 { - dead_subs.push(sub.clone()); - } - } - - for sub in dead_subs.iter() { - subs.remove(sub); - } - } -} - -impl PollRange { - fn matches(&self, entry: &K2VHistoryEntry) -> bool { - entry.partition == self.partition - && self - .prefix - .as_ref() - .map(|x| entry.ins_sort_key.starts_with(x)) - .unwrap_or(true) - && self - .start - .as_ref() - .map(|x| entry.ins_sort_key >= *x) - .unwrap_or(true) - && self - .end - .as_ref() - .map(|x| entry.ins_sort_key < *x) - .unwrap_or(true) - } -} diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 1dc396c0..c3cb5f9f 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -32,7 +32,7 @@ use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; use crate::k2v::history_table::*; use crate::k2v::item_table::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] @@ -292,8 +292,9 @@ impl K2VRpcHandler { self.item_table .data .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let local_counter_key = item.partition.hash(); let old_local_counter = tx - .get(&self.local_counter_tree, b"counter")? + .get(&self.local_counter_tree, &local_counter_key)? .and_then(|x| x.try_into().ok()) .map(u64::from_be_bytes) .unwrap_or_default(); @@ -314,20 +315,18 @@ impl K2VRpcHandler { tx.insert( &self.local_counter_tree, - b"counter", + &local_counter_key, u64::to_be_bytes(new_local_counter), )?; let hist_entry = K2VHistoryEntry { - partition: ent.partition.clone(), + ins_item: ent.clone(), node_counter: K2VHistorySortKey { node: make_node_id(self.system.id), counter: new_local_counter, }, prev_counter: old_local_counter, timestamp: now, - ins_sort_key: item.sort_key.clone(), - ins_value: item.value.clone(), deleted: false.into(), }; self.history_table.queue_insert(tx, &hist_entry)?; diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs new file mode 100644 index 00000000..1dcca4d5 --- /dev/null +++ b/src/model/k2v/sub.rs @@ -0,0 +1,115 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +use crate::k2v::history_table::*; +use crate::k2v::item_table::*; + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollKey { + pub partition: K2VItemPartition, + pub sort_key: String, +} + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollRange { + pub partition: K2VItemPartition, + pub prefix: Option, + pub start: Option, + pub end: Option, +} + +#[derive(Default)] +pub struct SubscriptionManager { + item_subscriptions: Mutex>>, + range_subscriptions: Mutex>>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self::default() + } + + // ---- simple item polling ---- + + pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { + let mut subs = self.item_subscriptions.lock().unwrap(); + if let Some(s) = subs.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + subs.insert(key.clone(), tx); + rx + } + } + + pub fn notify_item(&self, item: &K2VItem) { + let key = PollKey { + partition: item.partition.clone(), + sort_key: item.sort_key.clone(), + }; + let mut subs = self.item_subscriptions.lock().unwrap(); + if let Some(s) = subs.get(&key) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + subs.remove(&key); + } + } + } + + // ---- range polling ---- + + pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver { + let mut subs = self.range_subscriptions.lock().unwrap(); + if let Some(s) = subs.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + subs.insert(key.clone(), tx); + rx + } + } + + pub fn notify_range(&self, entry: &K2VHistoryEntry) { + let mut subs = self.range_subscriptions.lock().unwrap(); + let mut dead_subs = vec![]; + + for (sub, chan) in subs.iter() { + if sub.matches(&entry) { + if chan.send(entry.clone()).is_err() { + dead_subs.push(sub.clone()); + } + } else if chan.receiver_count() == 0 { + dead_subs.push(sub.clone()); + } + } + + for sub in dead_subs.iter() { + subs.remove(sub); + } + } +} + +impl PollRange { + fn matches(&self, entry: &K2VHistoryEntry) -> bool { + entry.ins_item.partition == self.partition + && self + .prefix + .as_ref() + .map(|x| entry.ins_item.sort_key.starts_with(x)) + .unwrap_or(true) + && self + .start + .as_ref() + .map(|x| entry.ins_item.sort_key >= *x) + .unwrap_or(true) + && self + .end + .as_ref() + .map(|x| entry.ins_item.sort_key < *x) + .unwrap_or(true) + } +} -- cgit v1.2.3