diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-11 11:35:36 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-11 11:35:36 +0100 |
commit | 09a3dad0f2c1641e4e300809dbdb3599b32efc03 (patch) | |
tree | fb2777c2e44fa3b27be6be81bb35e6d94af59d9d /src/model/k2v | |
parent | 32aab06929f09ce69bc49c4737b4801dd31a3b6f (diff) | |
download | garage-09a3dad0f2c1641e4e300809dbdb3599b32efc03.tar.gz garage-09a3dad0f2c1641e4e300809dbdb3599b32efc03.zip |
Lock once for insert_many
Diffstat (limited to 'src/model/k2v')
-rw-r--r-- | src/model/k2v/rpc.rs | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 8b070885..04ab3ab9 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -7,7 +7,7 @@ use std::collections::{BTreeMap, HashMap}; use std::convert::TryInto; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Duration; use async_trait::async_trait; @@ -72,7 +72,12 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc<System>, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + + // Using a mutex on the local_timestamp_tree is not strictly necessary, + // but it helps to not try to do several inserts at the same time, + // which would create transaction conflicts and force many useless retries. local_timestamp_tree: Mutex<db::Tree>, + endpoint: Arc<Endpoint<K2VRpc, Self>>, subscriptions: Arc<SubscriptionManager>, } @@ -323,7 +328,10 @@ impl K2VRpcHandler { // ---- internal handlers ---- async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> { - let new = self.local_insert(item)?; + let new = { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + self.local_insert(&local_timestamp_tree, item)? + }; // Propagate to rest of network if let Some(updated) = new { @@ -336,11 +344,14 @@ impl K2VRpcHandler { async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> { let mut updated_vec = vec![]; - for item in items { - let new = self.local_insert(item)?; + { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + for item in items { + let new = self.local_insert(&local_timestamp_tree, item)?; - if let Some(updated) = new { - updated_vec.push(updated); + if let Some(updated) = new { + updated_vec.push(updated); + } } } @@ -352,12 +363,11 @@ impl K2VRpcHandler { Ok(K2VRpc::Ok) } - fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> { - // Using a mutex on the local_timestamp_tree is not strictly necessary, - // but it helps to not try to do several inserts at the same time, - // which would create transaction conflicts and force many useless retries. - let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); - + fn local_insert( + &self, + local_timestamp_tree: &MutexGuard<'_, db::Tree>, + item: &InsertedItem, + ) -> Result<Option<K2VItem>, Error> { let now = now_msec(); self.item_table |