diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-05 13:11:48 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-05 13:11:48 +0100 |
commit | 49b5d18554c67b84777d97f24423207c2375ae5e (patch) | |
tree | 70403f9899d8d47897e766fed6171f06ed047b35 /src/model/k2v/rpc.rs | |
parent | 02e8eb167efa1f08d69fe7f8e6192cde726c45aa (diff) | |
download | garage-49b5d18554c67b84777d97f24423207c2375ae5e.tar.gz garage-49b5d18554c67b84777d97f24423207c2375ae5e.zip |
K2V history and preparation for range watch
Diffstat (limited to 'src/model/k2v/rpc.rs')
-rw-r--r-- | src/model/k2v/rpc.rs | 62 |
1 files changed, 55 insertions, 7 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f64a7984..1dc396c0 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -6,6 +6,7 @@ //! mean the vector clock gets much larger than needed). use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; @@ -15,9 +16,12 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::select; +use garage_db as db; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_rpc::system::System; use garage_rpc::*; @@ -26,6 +30,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; +use crate::k2v::history_table::*; use crate::k2v::item_table::*; use crate::k2v::poll::*; @@ -59,6 +64,8 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc<System>, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>, + local_counter_tree: db::Tree, endpoint: Arc<Endpoint<K2VRpc, Self>>, subscriptions: Arc<SubscriptionManager>, } @@ -66,14 +73,21 @@ pub struct K2VRpcHandler { impl K2VRpcHandler { pub fn new( system: Arc<System>, + db: &db::Db, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>, subscriptions: Arc<SubscriptionManager>, ) -> Arc<Self> { + let local_counter_tree = db + .open_tree("k2v_local_counter") + .expect("Unable to open DB tree for k2v local counter"); let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); let rpc_handler = Arc::new(Self { system, item_table, + history_table, + local_counter_tree, endpoint, subscriptions, }); @@ -181,7 +195,7 @@ impl K2VRpcHandler { Ok(()) } - pub async fn poll( + pub async fn poll_item( &self, bucket_id: Uuid, partition_key: String, @@ -273,9 +287,17 @@ impl K2VRpcHandler { } fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> { + let now = now_msec(); + self.item_table .data - .update_entry_with(&item.partition, &item.sort_key, |ent| { + .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let old_local_counter = tx + .get(&self.local_counter_tree, b"counter")? + .and_then(|x| x.try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, @@ -283,13 +305,39 @@ impl K2VRpcHandler { item.sort_key.clone(), ) }); - ent.update(self.system.id, &item.causal_context, item.value.clone()); - ent + let new_local_counter = ent.update( + self.system.id, + &item.causal_context, + item.value.clone(), + old_local_counter, + ); + + tx.insert( + &self.local_counter_tree, + b"counter", + u64::to_be_bytes(new_local_counter), + )?; + + let hist_entry = K2VHistoryEntry { + partition: ent.partition.clone(), + node_counter: K2VHistorySortKey { + node: make_node_id(self.system.id), + counter: new_local_counter, + }, + prev_counter: old_local_counter, + timestamp: now, + ins_sort_key: item.sort_key.clone(), + ins_value: item.value.clone(), + deleted: false.into(), + }; + self.history_table.queue_insert(tx, &hist_entry)?; + + Ok(ent) }) } - async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { - let mut chan = self.subscriptions.subscribe(key); + async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { + let mut chan = self.subscriptions.subscribe_item(key); let mut value = self .item_table @@ -326,7 +374,7 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler { } => { let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); select! { - ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), _ = delay => Ok(K2VRpc::PollItemResponse(None)), } } |