diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-10 11:01:49 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-10 11:03:52 +0100 |
commit | 9f5419f465de3ddbb9afc91464d0eb00636049b9 (patch) | |
tree | ca32f851cda043cfe61873ea46268605f1e2c8ed /src/model/k2v/rpc.rs | |
parent | a48e2e0cb2bdc75e14dfde199dbca0a779b1316b (diff) | |
download | garage-9f5419f465de3ddbb9afc91464d0eb00636049b9.tar.gz garage-9f5419f465de3ddbb9afc91464d0eb00636049b9.zip |
Make K2V item timestamps globally increasing on each node
Diffstat (limited to 'src/model/k2v/rpc.rs')
-rw-r--r-- | src/model/k2v/rpc.rs | 45 |
1 files changed, 41 insertions, 4 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 8860676b..e5497215 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -6,7 +6,8 @@ //! mean the vector clock gets much larger than needed). use std::collections::HashMap; -use std::sync::Arc; +use std::convert::TryInto; +use std::sync::{Arc, Mutex}; use std::time::Duration; use async_trait::async_trait; @@ -15,9 +16,12 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::select; +use garage_db as db; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::now_msec; use garage_rpc::system::System; use garage_rpc::*; @@ -29,6 +33,8 @@ use crate::k2v::causality::*; use crate::k2v::item_table::*; use crate::k2v::sub::*; +const TIMESTAMP_KEY: &'static [u8] = b"timestamp"; + /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] enum K2VRpc { @@ -59,6 +65,7 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc<System>, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + local_timestamp_tree: Mutex<db::Tree>, endpoint: Arc<Endpoint<K2VRpc, Self>>, subscriptions: Arc<SubscriptionManager>, } @@ -66,14 +73,19 @@ pub struct K2VRpcHandler { impl K2VRpcHandler { pub fn new( system: Arc<System>, + db: &db::Db, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, subscriptions: Arc<SubscriptionManager>, ) -> Arc<Self> { + let local_timestamp_tree = db + .open_tree("k2v_local_timestamp") + .expect("Unable to open DB tree for k2v local timestamp"); let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); let rpc_handler = Arc::new(Self { system, item_table, + local_timestamp_tree: Mutex::new(local_timestamp_tree), endpoint, subscriptions, }); @@ -273,9 +285,22 @@ impl K2VRpcHandler { } fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> { + // Using a mutex on the local_timestamp_tree is not strictly necessary, + // but it helps to not try to do several inserts at the same time, + // which would create transaction conflicts and force many useless retries. + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + + let now = now_msec(); + self.item_table .data - .update_entry_with(&item.partition, &item.sort_key, |ent| { + .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let old_local_timestamp = tx + .get(&local_timestamp_tree, TIMESTAMP_KEY)? + .and_then(|x| x.try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, @@ -283,8 +308,20 @@ impl K2VRpcHandler { item.sort_key.clone(), ) }); - ent.update(self.system.id, &item.causal_context, item.value.clone()); - ent + let new_local_timestamp = ent.update( + self.system.id, + &item.causal_context, + item.value.clone(), + std::cmp::max(old_local_timestamp, now), + ); + + tx.insert( + &local_timestamp_tree, + TIMESTAMP_KEY, + u64::to_be_bytes(new_local_timestamp), + )?; + + Ok(ent) }) } |