aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v/rpc.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-01-11 11:35:36 +0100
committerAlex Auvolat <alex@adnab.me>2023-01-11 11:35:36 +0100
commit09a3dad0f2c1641e4e300809dbdb3599b32efc03 (patch)
treefb2777c2e44fa3b27be6be81bb35e6d94af59d9d /src/model/k2v/rpc.rs
parent32aab06929f09ce69bc49c4737b4801dd31a3b6f (diff)
downloadgarage-09a3dad0f2c1641e4e300809dbdb3599b32efc03.tar.gz
garage-09a3dad0f2c1641e4e300809dbdb3599b32efc03.zip
Lock once for insert_many
Diffstat (limited to 'src/model/k2v/rpc.rs')
-rw-r--r--src/model/k2v/rpc.rs34
1 files changed, 22 insertions, 12 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 8b070885..04ab3ab9 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -7,7 +7,7 @@
use std::collections::{BTreeMap, HashMap};
use std::convert::TryInto;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use async_trait::async_trait;
@@ -72,7 +72,12 @@ impl Rpc for K2VRpc {
pub struct K2VRpcHandler {
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+
+ // Using a mutex on the local_timestamp_tree is not strictly necessary,
+ // but it helps to not try to do several inserts at the same time,
+ // which would create transaction conflicts and force many useless retries.
local_timestamp_tree: Mutex<db::Tree>,
+
endpoint: Arc<Endpoint<K2VRpc, Self>>,
subscriptions: Arc<SubscriptionManager>,
}
@@ -323,7 +328,10 @@ impl K2VRpcHandler {
// ---- internal handlers ----
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
- let new = self.local_insert(item)?;
+ let new = {
+ let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
+ self.local_insert(&local_timestamp_tree, item)?
+ };
// Propagate to rest of network
if let Some(updated) = new {
@@ -336,11 +344,14 @@ impl K2VRpcHandler {
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
let mut updated_vec = vec![];
- for item in items {
- let new = self.local_insert(item)?;
+ {
+ let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
+ for item in items {
+ let new = self.local_insert(&local_timestamp_tree, item)?;
- if let Some(updated) = new {
- updated_vec.push(updated);
+ if let Some(updated) = new {
+ updated_vec.push(updated);
+ }
}
}
@@ -352,12 +363,11 @@ impl K2VRpcHandler {
Ok(K2VRpc::Ok)
}
- fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
- // Using a mutex on the local_timestamp_tree is not strictly necessary,
- // but it helps to not try to do several inserts at the same time,
- // which would create transaction conflicts and force many useless retries.
- let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
-
+ fn local_insert(
+ &self,
+ local_timestamp_tree: &MutexGuard<'_, db::Tree>,
+ item: &InsertedItem,
+ ) -> Result<Option<K2VItem>, Error> {
let now = now_msec();
self.item_table