aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v/item_table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/k2v/item_table.rs')
-rw-r--r--src/model/k2v/item_table.rs69
1 files changed, 42 insertions, 27 deletions
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 7860cb17..28646f37 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -1,7 +1,8 @@
-use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
+use serde::{Deserialize, Serialize};
+
use garage_db as db;
use garage_util::data::*;
@@ -10,39 +11,50 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct K2VItem {
- pub partition: K2VItemPartition,
- pub sort_key: String,
+mod v08 {
+ use crate::k2v::causality::K2VNodeId;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
- items: BTreeMap<K2VNodeId, DvvsEntry>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct K2VItem {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
-pub struct K2VItemPartition {
- pub bucket_id: Uuid,
- pub partition_key: String,
-}
+ pub(super) items: BTreeMap<K2VNodeId, DvvsEntry>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-struct DvvsEntry {
- t_discard: u64,
- values: Vec<(u64, DvvsValue)>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
+ pub struct K2VItemPartition {
+ pub bucket_id: Uuid,
+ pub partition_key: String,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct DvvsEntry {
+ pub(super) t_discard: u64,
+ pub(super) values: Vec<(u64, DvvsValue)>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum DvvsValue {
- Value(#[serde(with = "serde_bytes")] Vec<u8>),
- Deleted,
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum DvvsValue {
+ Value(#[serde(with = "serde_bytes")] Vec<u8>),
+ Deleted,
+ }
+
+ impl garage_util::migrate::InitialFormat for K2VItem {}
}
+pub use v08::*;
+
impl K2VItem {
/// Creates a new K2VItem when no previous entry existed in the db
pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self {
@@ -61,7 +73,8 @@ impl K2VItem {
this_node: Uuid,
context: &Option<CausalContext>,
new_value: DvvsValue,
- ) {
+ node_ts: u64,
+ ) -> u64 {
if let Some(context) = context {
for (node, t_discard) in context.vector_clock.iter() {
if let Some(e) = self.items.get_mut(node) {
@@ -86,12 +99,14 @@ impl K2VItem {
values: vec![],
});
let t_prev = e.max_time();
- e.values.push((t_prev + 1, new_value));
+ let t_new = std::cmp::max(t_prev + 1, node_ts + 1);
+ e.values.push((t_new, new_value));
+ t_new
}
/// Extract the causality context of a K2V Item
pub fn causal_context(&self) -> CausalContext {
- let mut cc = CausalContext::new_empty();
+ let mut cc = CausalContext::new();
for (node, ent) in self.items.iter() {
cc.vector_clock.insert(*node, ent.max_time());
}
@@ -161,9 +176,9 @@ impl Crdt for DvvsEntry {
impl PartitionKey for K2VItemPartition {
fn hash(&self) -> Hash {
- use blake2::{Blake2b, Digest};
+ use blake2::{Blake2b512, Digest};
- let mut hasher = Blake2b::new();
+ let mut hasher = Blake2b512::new();
hasher.update(self.bucket_id.as_slice());
hasher.update(self.partition_key.as_bytes());
let mut hash = [0u8; 32];