aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v/rpc.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-01-05 13:11:48 +0100
committerAlex Auvolat <alex@adnab.me>2023-01-05 13:11:48 +0100
commit49b5d18554c67b84777d97f24423207c2375ae5e (patch)
tree70403f9899d8d47897e766fed6171f06ed047b35 /src/model/k2v/rpc.rs
parent02e8eb167efa1f08d69fe7f8e6192cde726c45aa (diff)
downloadgarage-49b5d18554c67b84777d97f24423207c2375ae5e.tar.gz
garage-49b5d18554c67b84777d97f24423207c2375ae5e.zip
K2V history and preparation for range watch
Diffstat (limited to 'src/model/k2v/rpc.rs')
-rw-r--r--src/model/k2v/rpc.rs62
1 files changed, 55 insertions, 7 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index f64a7984..1dc396c0 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -6,6 +6,7 @@
//! mean the vector clock gets much larger than needed).
use std::collections::HashMap;
+use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;
@@ -15,9 +16,12 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
+use garage_db as db;
+
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::*;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -26,6 +30,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
+use crate::k2v::history_table::*;
use crate::k2v::item_table::*;
use crate::k2v::poll::*;
@@ -59,6 +64,8 @@ impl Rpc for K2VRpc {
pub struct K2VRpcHandler {
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>,
+ local_counter_tree: db::Tree,
endpoint: Arc<Endpoint<K2VRpc, Self>>,
subscriptions: Arc<SubscriptionManager>,
}
@@ -66,14 +73,21 @@ pub struct K2VRpcHandler {
impl K2VRpcHandler {
pub fn new(
system: Arc<System>,
+ db: &db::Db,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>,
subscriptions: Arc<SubscriptionManager>,
) -> Arc<Self> {
+ let local_counter_tree = db
+ .open_tree("k2v_local_counter")
+ .expect("Unable to open DB tree for k2v local counter");
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
let rpc_handler = Arc::new(Self {
system,
item_table,
+ history_table,
+ local_counter_tree,
endpoint,
subscriptions,
});
@@ -181,7 +195,7 @@ impl K2VRpcHandler {
Ok(())
}
- pub async fn poll(
+ pub async fn poll_item(
&self,
bucket_id: Uuid,
partition_key: String,
@@ -273,9 +287,17 @@ impl K2VRpcHandler {
}
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
+ let now = now_msec();
+
self.item_table
.data
- .update_entry_with(&item.partition, &item.sort_key, |ent| {
+ .update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
+ let old_local_counter = tx
+ .get(&self.local_counter_tree, b"counter")?
+ .and_then(|x| x.try_into().ok())
+ .map(u64::from_be_bytes)
+ .unwrap_or_default();
+
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
item.partition.bucket_id,
@@ -283,13 +305,39 @@ impl K2VRpcHandler {
item.sort_key.clone(),
)
});
- ent.update(self.system.id, &item.causal_context, item.value.clone());
- ent
+ let new_local_counter = ent.update(
+ self.system.id,
+ &item.causal_context,
+ item.value.clone(),
+ old_local_counter,
+ );
+
+ tx.insert(
+ &self.local_counter_tree,
+ b"counter",
+ u64::to_be_bytes(new_local_counter),
+ )?;
+
+ let hist_entry = K2VHistoryEntry {
+ partition: ent.partition.clone(),
+ node_counter: K2VHistorySortKey {
+ node: make_node_id(self.system.id),
+ counter: new_local_counter,
+ },
+ prev_counter: old_local_counter,
+ timestamp: now,
+ ins_sort_key: item.sort_key.clone(),
+ ins_value: item.value.clone(),
+ deleted: false.into(),
+ };
+ self.history_table.queue_insert(tx, &hist_entry)?;
+
+ Ok(ent)
})
}
- async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
- let mut chan = self.subscriptions.subscribe(key);
+ async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
+ let mut chan = self.subscriptions.subscribe_item(key);
let mut value = self
.item_table
@@ -326,7 +374,7 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
} => {
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
select! {
- ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
+ ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
}
}