aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/model/garage.rs2
-rw-r--r--src/model/k2v/history_table.rs20
-rw-r--r--src/model/k2v/item_table.rs2
-rw-r--r--src/model/k2v/mod.rs3
-rw-r--r--src/model/k2v/rpc.rs11
-rw-r--r--src/model/k2v/sub.rs (renamed from src/model/k2v/poll.rs)8
6 files changed, 23 insertions, 23 deletions
diff --git a/src/model/garage.rs b/src/model/garage.rs
index a33265af..c0ffdd31 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -27,7 +27,7 @@ use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
-use crate::k2v::{history_table::*, item_table::*, poll::*, rpc::*};
+use crate::k2v::{history_table::*, item_table::*, sub::*, rpc::*};
/// An entire Garage full of data
pub struct Garage {
diff --git a/src/model/k2v/history_table.rs b/src/model/k2v/history_table.rs
index 6a6e9a10..9df03f5d 100644
--- a/src/model/k2v/history_table.rs
+++ b/src/model/k2v/history_table.rs
@@ -5,18 +5,21 @@ use garage_db as db;
use garage_table::crdt::*;
use garage_table::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
mod v08 {
use crate::k2v::causality::K2VNodeId;
- pub use crate::k2v::item_table::v08::{DvvsValue, K2VItemPartition};
+ pub use crate::k2v::item_table::v08::{DvvsValue, K2VItem, K2VItemPartition};
use garage_util::crdt;
use serde::{Deserialize, Serialize};
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VHistoryEntry {
- /// Partition key: a K2V partition
- pub partition: K2VItemPartition,
+ // Partition key: the partition key of ins_item
+
+ /// The inserted item
+ pub ins_item: K2VItem,
+
/// Sort key: the node ID and its local counter
pub node_counter: K2VHistorySortKey,
@@ -25,12 +28,8 @@ mod v08 {
/// The timesamp of the update (!= counter, counters are incremented
/// by one, timestamps are real clock timestamps)
pub timestamp: u64,
- /// The sort key of the item that was inserted
- pub ins_sort_key: String,
- /// The inserted value
- pub ins_value: DvvsValue,
- /// Whether this history entry is too old and should be deleted
+ /// Mark this history entry for deletion
pub deleted: crdt::Bool,
}
@@ -49,6 +48,7 @@ pub use v08::*;
impl Crdt for K2VHistoryEntry {
fn merge(&mut self, other: &Self) {
+ self.ins_item.merge(&other.ins_item);
self.deleted.merge(&other.deleted);
}
}
@@ -66,7 +66,7 @@ impl SortKey for K2VHistorySortKey {
impl Entry<K2VItemPartition, K2VHistorySortKey> for K2VHistoryEntry {
fn partition_key(&self) -> &K2VItemPartition {
- &self.partition
+ &self.ins_item.partition
}
fn sort_key(&self) -> &K2VHistorySortKey {
&self.node_counter
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 90a2f4d0..cf60fd3f 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -11,7 +11,7 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index 18deabac..4f7de5b7 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -3,5 +3,6 @@ pub mod causality;
pub mod history_table;
pub mod item_table;
-pub mod poll;
+pub(crate) mod sub;
+
pub mod rpc;
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 1dc396c0..c3cb5f9f 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -32,7 +32,7 @@ use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
use crate::k2v::history_table::*;
use crate::k2v::item_table::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
@@ -292,8 +292,9 @@ impl K2VRpcHandler {
self.item_table
.data
.update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
+ let local_counter_key = item.partition.hash();
let old_local_counter = tx
- .get(&self.local_counter_tree, b"counter")?
+ .get(&self.local_counter_tree, &local_counter_key)?
.and_then(|x| x.try_into().ok())
.map(u64::from_be_bytes)
.unwrap_or_default();
@@ -314,20 +315,18 @@ impl K2VRpcHandler {
tx.insert(
&self.local_counter_tree,
- b"counter",
+ &local_counter_key,
u64::to_be_bytes(new_local_counter),
)?;
let hist_entry = K2VHistoryEntry {
- partition: ent.partition.clone(),
+ ins_item: ent.clone(),
node_counter: K2VHistorySortKey {
node: make_node_id(self.system.id),
counter: new_local_counter,
},
prev_counter: old_local_counter,
timestamp: now,
- ins_sort_key: item.sort_key.clone(),
- ins_value: item.value.clone(),
deleted: false.into(),
};
self.history_table.queue_insert(tx, &hist_entry)?;
diff --git a/src/model/k2v/poll.rs b/src/model/k2v/sub.rs
index ea3e8d41..1dcca4d5 100644
--- a/src/model/k2v/poll.rs
+++ b/src/model/k2v/sub.rs
@@ -95,21 +95,21 @@ impl SubscriptionManager {
impl PollRange {
fn matches(&self, entry: &K2VHistoryEntry) -> bool {
- entry.partition == self.partition
+ entry.ins_item.partition == self.partition
&& self
.prefix
.as_ref()
- .map(|x| entry.ins_sort_key.starts_with(x))
+ .map(|x| entry.ins_item.sort_key.starts_with(x))
.unwrap_or(true)
&& self
.start
.as_ref()
- .map(|x| entry.ins_sort_key >= *x)
+ .map(|x| entry.ins_item.sort_key >= *x)
.unwrap_or(true)
&& self
.end
.as_ref()
- .map(|x| entry.ins_sort_key < *x)
+ .map(|x| entry.ins_item.sort_key < *x)
.unwrap_or(true)
}
}