diff options
Diffstat (limited to 'src/model/k2v')
-rw-r--r-- | src/model/k2v/history_table.rs | 20 | ||||
-rw-r--r-- | src/model/k2v/item_table.rs | 2 | ||||
-rw-r--r-- | src/model/k2v/mod.rs | 3 | ||||
-rw-r--r-- | src/model/k2v/rpc.rs | 11 | ||||
-rw-r--r-- | src/model/k2v/sub.rs (renamed from src/model/k2v/poll.rs) | 8 |
5 files changed, 22 insertions, 22 deletions
diff --git a/src/model/k2v/history_table.rs b/src/model/k2v/history_table.rs index 6a6e9a10..9df03f5d 100644 --- a/src/model/k2v/history_table.rs +++ b/src/model/k2v/history_table.rs @@ -5,18 +5,21 @@ use garage_db as db; use garage_table::crdt::*; use garage_table::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; mod v08 { use crate::k2v::causality::K2VNodeId; - pub use crate::k2v::item_table::v08::{DvvsValue, K2VItemPartition}; + pub use crate::k2v::item_table::v08::{DvvsValue, K2VItem, K2VItemPartition}; use garage_util::crdt; use serde::{Deserialize, Serialize}; #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct K2VHistoryEntry { - /// Partition key: a K2V partition - pub partition: K2VItemPartition, + // Partition key: the partition key of ins_item + + /// The inserted item + pub ins_item: K2VItem, + /// Sort key: the node ID and its local counter pub node_counter: K2VHistorySortKey, @@ -25,12 +28,8 @@ mod v08 { /// The timesamp of the update (!= counter, counters are incremented /// by one, timestamps are real clock timestamps) pub timestamp: u64, - /// The sort key of the item that was inserted - pub ins_sort_key: String, - /// The inserted value - pub ins_value: DvvsValue, - /// Whether this history entry is too old and should be deleted + /// Mark this history entry for deletion pub deleted: crdt::Bool, } @@ -49,6 +48,7 @@ pub use v08::*; impl Crdt for K2VHistoryEntry { fn merge(&mut self, other: &Self) { + self.ins_item.merge(&other.ins_item); self.deleted.merge(&other.deleted); } } @@ -66,7 +66,7 @@ impl SortKey for K2VHistorySortKey { impl Entry<K2VItemPartition, K2VHistorySortKey> for K2VHistoryEntry { fn partition_key(&self) -> &K2VItemPartition { - &self.partition + &self.ins_item.partition } fn sort_key(&self) -> &K2VHistorySortKey { &self.node_counter diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 90a2f4d0..cf60fd3f 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -11,7 +11,7 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; pub const ENTRIES: &str = "entries"; pub const CONFLICTS: &str = "conflicts"; diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index 18deabac..4f7de5b7 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -3,5 +3,6 @@ pub mod causality; pub mod history_table; pub mod item_table; -pub mod poll; +pub(crate) mod sub; + pub mod rpc; diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 1dc396c0..c3cb5f9f 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -32,7 +32,7 @@ use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; use crate::k2v::history_table::*; use crate::k2v::item_table::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] @@ -292,8 +292,9 @@ impl K2VRpcHandler { self.item_table .data .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let local_counter_key = item.partition.hash(); let old_local_counter = tx - .get(&self.local_counter_tree, b"counter")? + .get(&self.local_counter_tree, &local_counter_key)? .and_then(|x| x.try_into().ok()) .map(u64::from_be_bytes) .unwrap_or_default(); @@ -314,20 +315,18 @@ impl K2VRpcHandler { tx.insert( &self.local_counter_tree, - b"counter", + &local_counter_key, u64::to_be_bytes(new_local_counter), )?; let hist_entry = K2VHistoryEntry { - partition: ent.partition.clone(), + ins_item: ent.clone(), node_counter: K2VHistorySortKey { node: make_node_id(self.system.id), counter: new_local_counter, }, prev_counter: old_local_counter, timestamp: now, - ins_sort_key: item.sort_key.clone(), - ins_value: item.value.clone(), deleted: false.into(), }; self.history_table.queue_insert(tx, &hist_entry)?; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/sub.rs index ea3e8d41..1dcca4d5 100644 --- a/src/model/k2v/poll.rs +++ b/src/model/k2v/sub.rs @@ -95,21 +95,21 @@ impl SubscriptionManager { impl PollRange { fn matches(&self, entry: &K2VHistoryEntry) -> bool { - entry.partition == self.partition + entry.ins_item.partition == self.partition && self .prefix .as_ref() - .map(|x| entry.ins_sort_key.starts_with(x)) + .map(|x| entry.ins_item.sort_key.starts_with(x)) .unwrap_or(true) && self .start .as_ref() - .map(|x| entry.ins_sort_key >= *x) + .map(|x| entry.ins_item.sort_key >= *x) .unwrap_or(true) && self .end .as_ref() - .map(|x| entry.ins_sort_key < *x) + .map(|x| entry.ins_item.sort_key < *x) .unwrap_or(true) } } |