aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/k2v')
-rw-r--r--src/model/k2v/item_table.rs7
-rw-r--r--src/model/k2v/rpc.rs45
2 files changed, 46 insertions, 6 deletions
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index a22df68a..1ba9bb46 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -73,7 +73,8 @@ impl K2VItem {
this_node: Uuid,
context: &Option<CausalContext>,
new_value: DvvsValue,
- ) {
+ node_ts: u64,
+ ) -> u64 {
if let Some(context) = context {
for (node, t_discard) in context.vector_clock.iter() {
if let Some(e) = self.items.get_mut(node) {
@@ -98,7 +99,9 @@ impl K2VItem {
values: vec![],
});
let t_prev = e.max_time();
- e.values.push((t_prev + 1, new_value));
+ let t_new = std::cmp::max(t_prev + 1, node_ts + 1);
+ e.values.push((t_new, new_value));
+ t_new
}
/// Extract the causality context of a K2V Item
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 8860676b..e5497215 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -6,7 +6,8 @@
//! mean the vector clock gets much larger than needed).
use std::collections::HashMap;
-use std::sync::Arc;
+use std::convert::TryInto;
+use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
@@ -15,9 +16,12 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
+use garage_db as db;
+
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::now_msec;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -29,6 +33,8 @@ use crate::k2v::causality::*;
use crate::k2v::item_table::*;
use crate::k2v::sub::*;
+const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
+
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
enum K2VRpc {
@@ -59,6 +65,7 @@ impl Rpc for K2VRpc {
pub struct K2VRpcHandler {
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ local_timestamp_tree: Mutex<db::Tree>,
endpoint: Arc<Endpoint<K2VRpc, Self>>,
subscriptions: Arc<SubscriptionManager>,
}
@@ -66,14 +73,19 @@ pub struct K2VRpcHandler {
impl K2VRpcHandler {
pub fn new(
system: Arc<System>,
+ db: &db::Db,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
subscriptions: Arc<SubscriptionManager>,
) -> Arc<Self> {
+ let local_timestamp_tree = db
+ .open_tree("k2v_local_timestamp")
+ .expect("Unable to open DB tree for k2v local timestamp");
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
let rpc_handler = Arc::new(Self {
system,
item_table,
+ local_timestamp_tree: Mutex::new(local_timestamp_tree),
endpoint,
subscriptions,
});
@@ -273,9 +285,22 @@ impl K2VRpcHandler {
}
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
+ // Using a mutex on the local_timestamp_tree is not strictly necessary,
+ // but it helps to not try to do several inserts at the same time,
+ // which would create transaction conflicts and force many useless retries.
+ let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
+
+ let now = now_msec();
+
self.item_table
.data
- .update_entry_with(&item.partition, &item.sort_key, |ent| {
+ .update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
+ let old_local_timestamp = tx
+ .get(&local_timestamp_tree, TIMESTAMP_KEY)?
+ .and_then(|x| x.try_into().ok())
+ .map(u64::from_be_bytes)
+ .unwrap_or_default();
+
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
item.partition.bucket_id,
@@ -283,8 +308,20 @@ impl K2VRpcHandler {
item.sort_key.clone(),
)
});
- ent.update(self.system.id, &item.causal_context, item.value.clone());
- ent
+ let new_local_timestamp = ent.update(
+ self.system.id,
+ &item.causal_context,
+ item.value.clone(),
+ std::cmp::max(old_local_timestamp, now),
+ );
+
+ tx.insert(
+ &local_timestamp_tree,
+ TIMESTAMP_KEY,
+ u64::to_be_bytes(new_local_timestamp),
+ )?;
+
+ Ok(ent)
})
}