diff options
-rw-r--r-- | src/api/k2v/item.rs | 2 | ||||
-rw-r--r-- | src/model/garage.rs | 26 | ||||
-rw-r--r-- | src/model/k2v/history_table.rs | 107 | ||||
-rw-r--r-- | src/model/k2v/item_table.rs | 11 | ||||
-rw-r--r-- | src/model/k2v/mod.rs | 1 | ||||
-rw-r--r-- | src/model/k2v/poll.rs | 75 | ||||
-rw-r--r-- | src/model/k2v/rpc.rs | 62 | ||||
-rw-r--r-- | src/table/data.rs | 27 | ||||
-rw-r--r-- | src/table/schema.rs | 8 | ||||
-rw-r--r-- | src/table/util.rs | 2 |
10 files changed, 290 insertions, 31 deletions
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index f85138c7..9b78bc07 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -211,7 +211,7 @@ pub async fn handle_poll_item( let item = garage .k2v .rpc - .poll( + .poll_item( bucket_id, partition_key, sort_key, diff --git a/src/model/garage.rs b/src/model/garage.rs index ac1846ce..a33265af 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -27,7 +27,7 @@ use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::k2v::{item_table::*, poll::*, rpc::*}; +use crate::k2v::{history_table::*, item_table::*, poll::*, rpc::*}; /// An entire Garage full of data pub struct Garage { @@ -70,6 +70,8 @@ pub struct Garage { pub struct GarageK2V { /// Table containing K2V items pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + /// Table containing K2V modification history + pub history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>, /// Indexing table containing K2V item counters pub counter_table: Arc<IndexCounter<K2VItem>>, /// K2V RPC handler @@ -305,22 +307,42 @@ impl GarageK2V { fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); + info!("Initialize K2V subscription manager..."); let subscriptions = Arc::new(SubscriptionManager::new()); + info!("Initialize K2V item table..."); let item_table = Table::new( K2VItemTable { counter_table: counter_table.clone(), subscriptions: subscriptions.clone(), }, + meta_rep_param.clone(), + system.clone(), + db, + ); + info!("Initialize K2V history table..."); + let history_table = Table::new( + K2VHistoryTable { + subscriptions: subscriptions.clone(), + }, meta_rep_param, system.clone(), db, ); - let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions); + + info!("Initialize K2V RPC handler..."); + let rpc = K2VRpcHandler::new( + system, + db, + item_table.clone(), + history_table.clone(), + subscriptions, + ); Self { item_table, + history_table, counter_table, rpc, } diff --git a/src/model/k2v/history_table.rs b/src/model/k2v/history_table.rs new file mode 100644 index 00000000..6a6e9a10 --- /dev/null +++ b/src/model/k2v/history_table.rs @@ -0,0 +1,107 @@ +use std::sync::Arc; + +use garage_db as db; + +use garage_table::crdt::*; +use garage_table::*; + +use crate::k2v::poll::*; + +mod v08 { + use crate::k2v::causality::K2VNodeId; + pub use crate::k2v::item_table::v08::{DvvsValue, K2VItemPartition}; + use garage_util::crdt; + use serde::{Deserialize, Serialize}; + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct K2VHistoryEntry { + /// Partition key: a K2V partition + pub partition: K2VItemPartition, + /// Sort key: the node ID and its local counter + pub node_counter: K2VHistorySortKey, + + /// The value of the node's local counter before this entry was updated + pub prev_counter: u64, + /// The timesamp of the update (!= counter, counters are incremented + /// by one, timestamps are real clock timestamps) + pub timestamp: u64, + /// The sort key of the item that was inserted + pub ins_sort_key: String, + /// The inserted value + pub ins_value: DvvsValue, + + /// Whether this history entry is too old and should be deleted + pub deleted: crdt::Bool, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct K2VHistorySortKey { + pub node: K2VNodeId, + pub counter: u64, + } + + impl garage_util::migrate::InitialFormat for K2VHistoryEntry { + const VERSION_MARKER: &'static [u8] = b"Gk2vhe08"; + } +} + +pub use v08::*; + +impl Crdt for K2VHistoryEntry { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + } +} + +impl SortKey for K2VHistorySortKey { + type B<'a> = [u8; 16]; + + fn sort_key(&self) -> [u8; 16] { + let mut ret = [0u8; 16]; + ret[0..8].copy_from_slice(&u64::to_be_bytes(self.node)); + ret[8..16].copy_from_slice(&u64::to_be_bytes(self.counter)); + ret + } +} + +impl Entry<K2VItemPartition, K2VHistorySortKey> for K2VHistoryEntry { + fn partition_key(&self) -> &K2VItemPartition { + &self.partition + } + fn sort_key(&self) -> &K2VHistorySortKey { + &self.node_counter + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +pub struct K2VHistoryTable { + pub(crate) subscriptions: Arc<SubscriptionManager>, +} + +impl TableSchema for K2VHistoryTable { + const TABLE_NAME: &'static str = "k2v_history"; + + type P = K2VItemPartition; + type S = K2VHistorySortKey; + type E = K2VHistoryEntry; + type Filter = DeletedFilter; + + fn updated( + &self, + _tx: &mut db::Transaction, + _old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + if let Some(new_ent) = new { + self.subscriptions.notify_range(new_ent); + } + + Ok(()) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.deleted.get()) + } +} diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index ce3e4129..90a2f4d0 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -18,7 +18,7 @@ pub const CONFLICTS: &str = "conflicts"; pub const VALUES: &str = "values"; pub const BYTES: &str = "bytes"; -mod v08 { +pub(super) mod v08 { use crate::k2v::causality::K2VNodeId; use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; @@ -73,7 +73,8 @@ impl K2VItem { this_node: Uuid, context: &Option<CausalContext>, new_value: DvvsValue, - ) { + node_counter: u64, + ) -> u64 { if let Some(context) = context { for (node, t_discard) in context.vector_clock.iter() { if let Some(e) = self.items.get_mut(node) { @@ -98,7 +99,9 @@ impl K2VItem { values: vec![], }); let t_prev = e.max_time(); - e.values.push((t_prev + 1, new_value)); + let t_new = std::cmp::max(node_counter + 1, t_prev + 1); + e.values.push((t_new, new_value)); + t_new } /// Extract the causality context of a K2V Item @@ -237,7 +240,7 @@ impl TableSchema for K2VItemTable { // 2. Notify if let Some(new_ent) = new { - self.subscriptions.notify(new_ent); + self.subscriptions.notify_item(new_ent); } Ok(()) diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index f6a96151..18deabac 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,5 +1,6 @@ pub mod causality; +pub mod history_table; pub mod item_table; pub mod poll; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs index 93105207..ea3e8d41 100644 --- a/src/model/k2v/poll.rs +++ b/src/model/k2v/poll.rs @@ -4,6 +4,7 @@ use std::sync::Mutex; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; +use crate::k2v::history_table::*; use crate::k2v::item_table::*; #[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -12,9 +13,18 @@ pub struct PollKey { pub sort_key: String, } +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollRange { + pub partition: K2VItemPartition, + pub prefix: Option<String>, + pub start: Option<String>, + pub end: Option<String>, +} + #[derive(Default)] pub struct SubscriptionManager { - subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>, + item_subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>, + range_subscriptions: Mutex<HashMap<PollRange, broadcast::Sender<K2VHistoryEntry>>>, } impl SubscriptionManager { @@ -22,8 +32,10 @@ impl SubscriptionManager { Self::default() } - pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { - let mut subs = self.subscriptions.lock().unwrap(); + // ---- simple item polling ---- + + pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { + let mut subs = self.item_subscriptions.lock().unwrap(); if let Some(s) = subs.get(key) { s.subscribe() } else { @@ -33,12 +45,12 @@ impl SubscriptionManager { } } - pub fn notify(&self, item: &K2VItem) { + pub fn notify_item(&self, item: &K2VItem) { let key = PollKey { partition: item.partition.clone(), sort_key: item.sort_key.clone(), }; - let mut subs = self.subscriptions.lock().unwrap(); + let mut subs = self.item_subscriptions.lock().unwrap(); if let Some(s) = subs.get(&key) { if s.send(item.clone()).is_err() { // no more subscribers, remove channel from here @@ -47,4 +59,57 @@ impl SubscriptionManager { } } } + + // ---- range polling ---- + + pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver<K2VHistoryEntry> { + let mut subs = self.range_subscriptions.lock().unwrap(); + if let Some(s) = subs.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + subs.insert(key.clone(), tx); + rx + } + } + + pub fn notify_range(&self, entry: &K2VHistoryEntry) { + let mut subs = self.range_subscriptions.lock().unwrap(); + let mut dead_subs = vec![]; + + for (sub, chan) in subs.iter() { + if sub.matches(&entry) { + if chan.send(entry.clone()).is_err() { + dead_subs.push(sub.clone()); + } + } else if chan.receiver_count() == 0 { + dead_subs.push(sub.clone()); + } + } + + for sub in dead_subs.iter() { + subs.remove(sub); + } + } +} + +impl PollRange { + fn matches(&self, entry: &K2VHistoryEntry) -> bool { + entry.partition == self.partition + && self + .prefix + .as_ref() + .map(|x| entry.ins_sort_key.starts_with(x)) + .unwrap_or(true) + && self + .start + .as_ref() + .map(|x| entry.ins_sort_key >= *x) + .unwrap_or(true) + && self + .end + .as_ref() + .map(|x| entry.ins_sort_key < *x) + .unwrap_or(true) + } } diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f64a7984..1dc396c0 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -6,6 +6,7 @@ //! mean the vector clock gets much larger than needed). use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; @@ -15,9 +16,12 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::select; +use garage_db as db; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_rpc::system::System; use garage_rpc::*; @@ -26,6 +30,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; +use crate::k2v::history_table::*; use crate::k2v::item_table::*; use crate::k2v::poll::*; @@ -59,6 +64,8 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc<System>, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>, + local_counter_tree: db::Tree, endpoint: Arc<Endpoint<K2VRpc, Self>>, subscriptions: Arc<SubscriptionManager>, } @@ -66,14 +73,21 @@ pub struct K2VRpcHandler { impl K2VRpcHandler { pub fn new( system: Arc<System>, + db: &db::Db, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>, subscriptions: Arc<SubscriptionManager>, ) -> Arc<Self> { + let local_counter_tree = db + .open_tree("k2v_local_counter") + .expect("Unable to open DB tree for k2v local counter"); let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); let rpc_handler = Arc::new(Self { system, item_table, + history_table, + local_counter_tree, endpoint, subscriptions, }); @@ -181,7 +195,7 @@ impl K2VRpcHandler { Ok(()) } - pub async fn poll( + pub async fn poll_item( &self, bucket_id: Uuid, partition_key: String, @@ -273,9 +287,17 @@ impl K2VRpcHandler { } fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> { + let now = now_msec(); + self.item_table .data - .update_entry_with(&item.partition, &item.sort_key, |ent| { + .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let old_local_counter = tx + .get(&self.local_counter_tree, b"counter")? + .and_then(|x| x.try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, @@ -283,13 +305,39 @@ impl K2VRpcHandler { item.sort_key.clone(), ) }); - ent.update(self.system.id, &item.causal_context, item.value.clone()); - ent + let new_local_counter = ent.update( + self.system.id, + &item.causal_context, + item.value.clone(), + old_local_counter, + ); + + tx.insert( + &self.local_counter_tree, + b"counter", + u64::to_be_bytes(new_local_counter), + )?; + + let hist_entry = K2VHistoryEntry { + partition: ent.partition.clone(), + node_counter: K2VHistorySortKey { + node: make_node_id(self.system.id), + counter: new_local_counter, + }, + prev_counter: old_local_counter, + timestamp: now, + ins_sort_key: item.sort_key.clone(), + ins_value: item.value.clone(), + deleted: false.into(), + }; + self.history_table.queue_insert(tx, &hist_entry)?; + + Ok(ent) }) } - async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { - let mut chan = self.subscriptions.subscribe(key); + async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { + let mut chan = self.subscriptions.subscribe_item(key); let mut value = self .item_table @@ -326,7 +374,7 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler { } => { let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); select! { - ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), _ = delay => Ok(K2VRpc::PollItemResponse(None)), } } diff --git a/src/table/data.rs b/src/table/data.rs index 5c792f1f..d19586f3 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -181,13 +181,17 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { let update = self.decode_entry(update_bytes)?; - self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent { - Some(mut ent) => { - ent.merge(&update); - ent - } - None => update.clone(), - })?; + self.update_entry_with( + update.partition_key(), + update.sort_key(), + |_tx, ent| match ent { + Some(mut ent) => { + ent.merge(&update); + Ok(ent) + } + None => Ok(update.clone()), + }, + )?; Ok(()) } @@ -195,7 +199,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { &self, partition_key: &F::P, sort_key: &F::S, - f: impl Fn(Option<F::E>) -> F::E, + update_fn: impl Fn(&mut db::Transaction, Option<F::E>) -> db::TxResult<F::E, Error>, ) -> Result<Option<F::E>, Error> { let tree_key = self.tree_key(partition_key, sort_key); @@ -203,10 +207,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; - let new_entry = f(Some(old_entry.clone())); + let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?; (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, f(None)), + None => (None, None, update_fn(&mut tx, None)?), }; // Changed can be true in two scenarios @@ -335,6 +339,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.insert_queue, &tree_key, new_entry)?; + self.insert_queue_notify.notify_one(); Ok(()) @@ -344,7 +349,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); + ret.extend(s.sort_key().borrow()); ret } diff --git a/src/table/schema.rs b/src/table/schema.rs index 5cbf6c95..86ff816a 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -31,17 +31,23 @@ impl PartitionKey for FixedBytes32 { /// Trait for field used to sort data pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { + type B<'a>: std::borrow::Borrow<[u8]>; + /// Get the key used to sort - fn sort_key(&self) -> &[u8]; + fn sort_key(&self) -> Self::B<'_>; } impl SortKey for String { + type B<'a> = &'a [u8]; + fn sort_key(&self) -> &[u8] { self.as_bytes() } } impl SortKey for FixedBytes32 { + type B<'a> = &'a [u8]; + fn sort_key(&self) -> &[u8] { self.as_slice() } diff --git a/src/table/util.rs b/src/table/util.rs index 0b10cf3f..a79e2cba 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -7,6 +7,8 @@ use crate::schema::*; #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct EmptyKey; impl SortKey for EmptyKey { + type B<'a> = &'a [u8]; + fn sort_key(&self) -> &[u8] { &[] } |