diff options
-rw-r--r-- | src/model/k2v/rpc.rs | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 8b070885..04ab3ab9 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -7,7 +7,7 @@ use std::collections::{BTreeMap, HashMap}; use std::convert::TryInto; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Duration; use async_trait::async_trait; @@ -72,7 +72,12 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc<System>, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + + // Using a mutex on the local_timestamp_tree is not strictly necessary, + // but it helps to not try to do several inserts at the same time, + // which would create transaction conflicts and force many useless retries. local_timestamp_tree: Mutex<db::Tree>, + endpoint: Arc<Endpoint<K2VRpc, Self>>, subscriptions: Arc<SubscriptionManager>, } @@ -323,7 +328,10 @@ impl K2VRpcHandler { // ---- internal handlers ---- async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> { - let new = self.local_insert(item)?; + let new = { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + self.local_insert(&local_timestamp_tree, item)? + }; // Propagate to rest of network if let Some(updated) = new { @@ -336,11 +344,14 @@ impl K2VRpcHandler { async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> { let mut updated_vec = vec![]; - for item in items { - let new = self.local_insert(item)?; + { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + for item in items { + let new = self.local_insert(&local_timestamp_tree, item)?; - if let Some(updated) = new { - updated_vec.push(updated); + if let Some(updated) = new { + updated_vec.push(updated); + } } } @@ -352,12 +363,11 @@ impl K2VRpcHandler { Ok(K2VRpc::Ok) } - fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> { - // Using a mutex on the local_timestamp_tree is not strictly necessary, - // but it helps to not try to do several inserts at the same time, - // which would create transaction conflicts and force many useless retries. - let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); - + fn local_insert( + &self, + local_timestamp_tree: &MutexGuard<'_, db::Tree>, + item: &InsertedItem, + ) -> Result<Option<K2VItem>, Error> { let now = now_msec(); self.item_table |