From 49b5d18554c67b84777d97f24423207c2375ae5e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 5 Jan 2023 13:11:48 +0100 Subject: K2V history and preparation for range watch --- src/model/k2v/rpc.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 7 deletions(-) (limited to 'src/model/k2v/rpc.rs') diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f64a7984..1dc396c0 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -6,6 +6,7 @@ //! mean the vector clock gets much larger than needed). use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; @@ -15,9 +16,12 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::select; +use garage_db as db; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_rpc::system::System; use garage_rpc::*; @@ -26,6 +30,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; +use crate::k2v::history_table::*; use crate::k2v::item_table::*; use crate::k2v::poll::*; @@ -59,6 +64,8 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc, item_table: Arc>, + history_table: Arc>, + local_counter_tree: db::Tree, endpoint: Arc>, subscriptions: Arc, } @@ -66,14 +73,21 @@ pub struct K2VRpcHandler { impl K2VRpcHandler { pub fn new( system: Arc, + db: &db::Db, item_table: Arc>, + history_table: Arc>, subscriptions: Arc, ) -> Arc { + let local_counter_tree = db + .open_tree("k2v_local_counter") + .expect("Unable to open DB tree for k2v local counter"); let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); let rpc_handler = Arc::new(Self { system, item_table, + history_table, + local_counter_tree, endpoint, subscriptions, }); @@ -181,7 +195,7 @@ impl K2VRpcHandler { Ok(()) } - pub async fn poll( + pub async fn poll_item( &self, bucket_id: Uuid, partition_key: String, @@ -273,9 +287,17 @@ impl K2VRpcHandler { } fn local_insert(&self, item: &InsertedItem) -> Result, Error> { + let now = now_msec(); + self.item_table .data - .update_entry_with(&item.partition, &item.sort_key, |ent| { + .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let old_local_counter = tx + .get(&self.local_counter_tree, b"counter")? + .and_then(|x| x.try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, @@ -283,13 +305,39 @@ impl K2VRpcHandler { item.sort_key.clone(), ) }); - ent.update(self.system.id, &item.causal_context, item.value.clone()); - ent + let new_local_counter = ent.update( + self.system.id, + &item.causal_context, + item.value.clone(), + old_local_counter, + ); + + tx.insert( + &self.local_counter_tree, + b"counter", + u64::to_be_bytes(new_local_counter), + )?; + + let hist_entry = K2VHistoryEntry { + partition: ent.partition.clone(), + node_counter: K2VHistorySortKey { + node: make_node_id(self.system.id), + counter: new_local_counter, + }, + prev_counter: old_local_counter, + timestamp: now, + ins_sort_key: item.sort_key.clone(), + ins_value: item.value.clone(), + deleted: false.into(), + }; + self.history_table.queue_insert(tx, &hist_entry)?; + + Ok(ent) }) } - async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result { - let mut chan = self.subscriptions.subscribe(key); + async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result { + let mut chan = self.subscriptions.subscribe_item(key); let mut value = self .item_table @@ -326,7 +374,7 @@ impl EndpointHandler for K2VRpcHandler { } => { let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); select! { - ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), _ = delay => Ok(K2VRpc::PollItemResponse(None)), } } -- cgit v1.2.3