aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/k2v/item.rs2
-rw-r--r--src/model/garage.rs26
-rw-r--r--src/model/k2v/history_table.rs107
-rw-r--r--src/model/k2v/item_table.rs11
-rw-r--r--src/model/k2v/mod.rs1
-rw-r--r--src/model/k2v/poll.rs75
-rw-r--r--src/model/k2v/rpc.rs62
-rw-r--r--src/table/data.rs27
-rw-r--r--src/table/schema.rs8
-rw-r--r--src/table/util.rs2
10 files changed, 290 insertions, 31 deletions
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index f85138c7..9b78bc07 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -211,7 +211,7 @@ pub async fn handle_poll_item(
let item = garage
.k2v
.rpc
- .poll(
+ .poll_item(
bucket_id,
partition_key,
sort_key,
diff --git a/src/model/garage.rs b/src/model/garage.rs
index ac1846ce..a33265af 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -27,7 +27,7 @@ use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
-use crate::k2v::{item_table::*, poll::*, rpc::*};
+use crate::k2v::{history_table::*, item_table::*, poll::*, rpc::*};
/// An entire Garage full of data
pub struct Garage {
@@ -70,6 +70,8 @@ pub struct Garage {
pub struct GarageK2V {
/// Table containing K2V items
pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ /// Table containing K2V modification history
+ pub history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>,
/// Indexing table containing K2V item counters
pub counter_table: Arc<IndexCounter<K2VItem>>,
/// K2V RPC handler
@@ -305,22 +307,42 @@ impl GarageK2V {
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
+
info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new());
+
info!("Initialize K2V item table...");
let item_table = Table::new(
K2VItemTable {
counter_table: counter_table.clone(),
subscriptions: subscriptions.clone(),
},
+ meta_rep_param.clone(),
+ system.clone(),
+ db,
+ );
+ info!("Initialize K2V history table...");
+ let history_table = Table::new(
+ K2VHistoryTable {
+ subscriptions: subscriptions.clone(),
+ },
meta_rep_param,
system.clone(),
db,
);
- let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
+
+ info!("Initialize K2V RPC handler...");
+ let rpc = K2VRpcHandler::new(
+ system,
+ db,
+ item_table.clone(),
+ history_table.clone(),
+ subscriptions,
+ );
Self {
item_table,
+ history_table,
counter_table,
rpc,
}
diff --git a/src/model/k2v/history_table.rs b/src/model/k2v/history_table.rs
new file mode 100644
index 00000000..6a6e9a10
--- /dev/null
+++ b/src/model/k2v/history_table.rs
@@ -0,0 +1,107 @@
+use std::sync::Arc;
+
+use garage_db as db;
+
+use garage_table::crdt::*;
+use garage_table::*;
+
+use crate::k2v::poll::*;
+
+mod v08 {
+ use crate::k2v::causality::K2VNodeId;
+ pub use crate::k2v::item_table::v08::{DvvsValue, K2VItemPartition};
+ use garage_util::crdt;
+ use serde::{Deserialize, Serialize};
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct K2VHistoryEntry {
+ /// Partition key: a K2V partition
+ pub partition: K2VItemPartition,
+ /// Sort key: the node ID and its local counter
+ pub node_counter: K2VHistorySortKey,
+
+ /// The value of the node's local counter before this entry was updated
+ pub prev_counter: u64,
+ /// The timesamp of the update (!= counter, counters are incremented
+ /// by one, timestamps are real clock timestamps)
+ pub timestamp: u64,
+ /// The sort key of the item that was inserted
+ pub ins_sort_key: String,
+ /// The inserted value
+ pub ins_value: DvvsValue,
+
+ /// Whether this history entry is too old and should be deleted
+ pub deleted: crdt::Bool,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct K2VHistorySortKey {
+ pub node: K2VNodeId,
+ pub counter: u64,
+ }
+
+ impl garage_util::migrate::InitialFormat for K2VHistoryEntry {
+ const VERSION_MARKER: &'static [u8] = b"Gk2vhe08";
+ }
+}
+
+pub use v08::*;
+
+impl Crdt for K2VHistoryEntry {
+ fn merge(&mut self, other: &Self) {
+ self.deleted.merge(&other.deleted);
+ }
+}
+
+impl SortKey for K2VHistorySortKey {
+ type B<'a> = [u8; 16];
+
+ fn sort_key(&self) -> [u8; 16] {
+ let mut ret = [0u8; 16];
+ ret[0..8].copy_from_slice(&u64::to_be_bytes(self.node));
+ ret[8..16].copy_from_slice(&u64::to_be_bytes(self.counter));
+ ret
+ }
+}
+
+impl Entry<K2VItemPartition, K2VHistorySortKey> for K2VHistoryEntry {
+ fn partition_key(&self) -> &K2VItemPartition {
+ &self.partition
+ }
+ fn sort_key(&self) -> &K2VHistorySortKey {
+ &self.node_counter
+ }
+ fn is_tombstone(&self) -> bool {
+ self.deleted.get()
+ }
+}
+
+pub struct K2VHistoryTable {
+ pub(crate) subscriptions: Arc<SubscriptionManager>,
+}
+
+impl TableSchema for K2VHistoryTable {
+ const TABLE_NAME: &'static str = "k2v_history";
+
+ type P = K2VItemPartition;
+ type S = K2VHistorySortKey;
+ type E = K2VHistoryEntry;
+ type Filter = DeletedFilter;
+
+ fn updated(
+ &self,
+ _tx: &mut db::Transaction,
+ _old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ if let Some(new_ent) = new {
+ self.subscriptions.notify_range(new_ent);
+ }
+
+ Ok(())
+ }
+
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
+ filter.apply(entry.deleted.get())
+ }
+}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index ce3e4129..90a2f4d0 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -18,7 +18,7 @@ pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
-mod v08 {
+pub(super) mod v08 {
use crate::k2v::causality::K2VNodeId;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
@@ -73,7 +73,8 @@ impl K2VItem {
this_node: Uuid,
context: &Option<CausalContext>,
new_value: DvvsValue,
- ) {
+ node_counter: u64,
+ ) -> u64 {
if let Some(context) = context {
for (node, t_discard) in context.vector_clock.iter() {
if let Some(e) = self.items.get_mut(node) {
@@ -98,7 +99,9 @@ impl K2VItem {
values: vec![],
});
let t_prev = e.max_time();
- e.values.push((t_prev + 1, new_value));
+ let t_new = std::cmp::max(node_counter + 1, t_prev + 1);
+ e.values.push((t_new, new_value));
+ t_new
}
/// Extract the causality context of a K2V Item
@@ -237,7 +240,7 @@ impl TableSchema for K2VItemTable {
// 2. Notify
if let Some(new_ent) = new {
- self.subscriptions.notify(new_ent);
+ self.subscriptions.notify_item(new_ent);
}
Ok(())
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index f6a96151..18deabac 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -1,5 +1,6 @@
pub mod causality;
+pub mod history_table;
pub mod item_table;
pub mod poll;
diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs
index 93105207..ea3e8d41 100644
--- a/src/model/k2v/poll.rs
+++ b/src/model/k2v/poll.rs
@@ -4,6 +4,7 @@ use std::sync::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
+use crate::k2v::history_table::*;
use crate::k2v::item_table::*;
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -12,9 +13,18 @@ pub struct PollKey {
pub sort_key: String,
}
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollRange {
+ pub partition: K2VItemPartition,
+ pub prefix: Option<String>,
+ pub start: Option<String>,
+ pub end: Option<String>,
+}
+
#[derive(Default)]
pub struct SubscriptionManager {
- subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
+ item_subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
+ range_subscriptions: Mutex<HashMap<PollRange, broadcast::Sender<K2VHistoryEntry>>>,
}
impl SubscriptionManager {
@@ -22,8 +32,10 @@ impl SubscriptionManager {
Self::default()
}
- pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
- let mut subs = self.subscriptions.lock().unwrap();
+ // ---- simple item polling ----
+
+ pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
+ let mut subs = self.item_subscriptions.lock().unwrap();
if let Some(s) = subs.get(key) {
s.subscribe()
} else {
@@ -33,12 +45,12 @@ impl SubscriptionManager {
}
}
- pub fn notify(&self, item: &K2VItem) {
+ pub fn notify_item(&self, item: &K2VItem) {
let key = PollKey {
partition: item.partition.clone(),
sort_key: item.sort_key.clone(),
};
- let mut subs = self.subscriptions.lock().unwrap();
+ let mut subs = self.item_subscriptions.lock().unwrap();
if let Some(s) = subs.get(&key) {
if s.send(item.clone()).is_err() {
// no more subscribers, remove channel from here
@@ -47,4 +59,57 @@ impl SubscriptionManager {
}
}
}
+
+ // ---- range polling ----
+
+ pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver<K2VHistoryEntry> {
+ let mut subs = self.range_subscriptions.lock().unwrap();
+ if let Some(s) = subs.get(key) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ subs.insert(key.clone(), tx);
+ rx
+ }
+ }
+
+ pub fn notify_range(&self, entry: &K2VHistoryEntry) {
+ let mut subs = self.range_subscriptions.lock().unwrap();
+ let mut dead_subs = vec![];
+
+ for (sub, chan) in subs.iter() {
+ if sub.matches(&entry) {
+ if chan.send(entry.clone()).is_err() {
+ dead_subs.push(sub.clone());
+ }
+ } else if chan.receiver_count() == 0 {
+ dead_subs.push(sub.clone());
+ }
+ }
+
+ for sub in dead_subs.iter() {
+ subs.remove(sub);
+ }
+ }
+}
+
+impl PollRange {
+ fn matches(&self, entry: &K2VHistoryEntry) -> bool {
+ entry.partition == self.partition
+ && self
+ .prefix
+ .as_ref()
+ .map(|x| entry.ins_sort_key.starts_with(x))
+ .unwrap_or(true)
+ && self
+ .start
+ .as_ref()
+ .map(|x| entry.ins_sort_key >= *x)
+ .unwrap_or(true)
+ && self
+ .end
+ .as_ref()
+ .map(|x| entry.ins_sort_key < *x)
+ .unwrap_or(true)
+ }
}
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index f64a7984..1dc396c0 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -6,6 +6,7 @@
//! mean the vector clock gets much larger than needed).
use std::collections::HashMap;
+use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;
@@ -15,9 +16,12 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
+use garage_db as db;
+
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::*;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -26,6 +30,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
+use crate::k2v::history_table::*;
use crate::k2v::item_table::*;
use crate::k2v::poll::*;
@@ -59,6 +64,8 @@ impl Rpc for K2VRpc {
pub struct K2VRpcHandler {
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>,
+ local_counter_tree: db::Tree,
endpoint: Arc<Endpoint<K2VRpc, Self>>,
subscriptions: Arc<SubscriptionManager>,
}
@@ -66,14 +73,21 @@ pub struct K2VRpcHandler {
impl K2VRpcHandler {
pub fn new(
system: Arc<System>,
+ db: &db::Db,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>,
subscriptions: Arc<SubscriptionManager>,
) -> Arc<Self> {
+ let local_counter_tree = db
+ .open_tree("k2v_local_counter")
+ .expect("Unable to open DB tree for k2v local counter");
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
let rpc_handler = Arc::new(Self {
system,
item_table,
+ history_table,
+ local_counter_tree,
endpoint,
subscriptions,
});
@@ -181,7 +195,7 @@ impl K2VRpcHandler {
Ok(())
}
- pub async fn poll(
+ pub async fn poll_item(
&self,
bucket_id: Uuid,
partition_key: String,
@@ -273,9 +287,17 @@ impl K2VRpcHandler {
}
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
+ let now = now_msec();
+
self.item_table
.data
- .update_entry_with(&item.partition, &item.sort_key, |ent| {
+ .update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
+ let old_local_counter = tx
+ .get(&self.local_counter_tree, b"counter")?
+ .and_then(|x| x.try_into().ok())
+ .map(u64::from_be_bytes)
+ .unwrap_or_default();
+
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
item.partition.bucket_id,
@@ -283,13 +305,39 @@ impl K2VRpcHandler {
item.sort_key.clone(),
)
});
- ent.update(self.system.id, &item.causal_context, item.value.clone());
- ent
+ let new_local_counter = ent.update(
+ self.system.id,
+ &item.causal_context,
+ item.value.clone(),
+ old_local_counter,
+ );
+
+ tx.insert(
+ &self.local_counter_tree,
+ b"counter",
+ u64::to_be_bytes(new_local_counter),
+ )?;
+
+ let hist_entry = K2VHistoryEntry {
+ partition: ent.partition.clone(),
+ node_counter: K2VHistorySortKey {
+ node: make_node_id(self.system.id),
+ counter: new_local_counter,
+ },
+ prev_counter: old_local_counter,
+ timestamp: now,
+ ins_sort_key: item.sort_key.clone(),
+ ins_value: item.value.clone(),
+ deleted: false.into(),
+ };
+ self.history_table.queue_insert(tx, &hist_entry)?;
+
+ Ok(ent)
})
}
- async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
- let mut chan = self.subscriptions.subscribe(key);
+ async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
+ let mut chan = self.subscriptions.subscribe_item(key);
let mut value = self
.item_table
@@ -326,7 +374,7 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
} => {
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
select! {
- ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
+ ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
}
}
diff --git a/src/table/data.rs b/src/table/data.rs
index 5c792f1f..d19586f3 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -181,13 +181,17 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
- self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
- Some(mut ent) => {
- ent.merge(&update);
- ent
- }
- None => update.clone(),
- })?;
+ self.update_entry_with(
+ update.partition_key(),
+ update.sort_key(),
+ |_tx, ent| match ent {
+ Some(mut ent) => {
+ ent.merge(&update);
+ Ok(ent)
+ }
+ None => Ok(update.clone()),
+ },
+ )?;
Ok(())
}
@@ -195,7 +199,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
&self,
partition_key: &F::P,
sort_key: &F::S,
- f: impl Fn(Option<F::E>) -> F::E,
+ update_fn: impl Fn(&mut db::Transaction, Option<F::E>) -> db::TxResult<F::E, Error>,
) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key);
@@ -203,10 +207,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
- let new_entry = f(Some(old_entry.clone()));
+ let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
(Some(old_entry), Some(old_bytes), new_entry)
}
- None => (None, None, f(None)),
+ None => (None, None, update_fn(&mut tx, None)?),
};
// Changed can be true in two scenarios
@@ -335,6 +339,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
tx.insert(&self.insert_queue, &tree_key, new_entry)?;
+
self.insert_queue_notify.notify_one();
Ok(())
@@ -344,7 +349,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
- ret.extend(s.sort_key());
+ ret.extend(s.sort_key().borrow());
ret
}
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 5cbf6c95..86ff816a 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -31,17 +31,23 @@ impl PartitionKey for FixedBytes32 {
/// Trait for field used to sort data
pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
+ type B<'a>: std::borrow::Borrow<[u8]>;
+
/// Get the key used to sort
- fn sort_key(&self) -> &[u8];
+ fn sort_key(&self) -> Self::B<'_>;
}
impl SortKey for String {
+ type B<'a> = &'a [u8];
+
fn sort_key(&self) -> &[u8] {
self.as_bytes()
}
}
impl SortKey for FixedBytes32 {
+ type B<'a> = &'a [u8];
+
fn sort_key(&self) -> &[u8] {
self.as_slice()
}
diff --git a/src/table/util.rs b/src/table/util.rs
index 0b10cf3f..a79e2cba 100644
--- a/src/table/util.rs
+++ b/src/table/util.rs
@@ -7,6 +7,8 @@ use crate::schema::*;
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmptyKey;
impl SortKey for EmptyKey {
+ type B<'a> = &'a [u8];
+
fn sort_key(&self) -> &[u8] {
&[]
}