diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/src/table.rs b/src/table.rs index 53e17396..a3d02d0c 100644 --- a/src/table.rs +++ b/src/table.rs @@ -204,7 +204,7 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::<F>::Update(entries); - let resp = self.rpc_client.call(&node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); @@ -358,20 +358,28 @@ where // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { + let self2 = self.clone(); rpc_server.add_handler::<TableRPC<F>, _, _>(path, move |msg, _addr| { - let self2 = self.clone(); - async move { self2.handle(msg).await } - }) + let self2 = self2.clone(); + async move { self2.handle(&msg).await } + }); + + let self2 = self.clone(); + self.rpc_client + .set_local_handler(self.system.id, move |msg| { + let self2 = self2.clone(); + async move { self2.handle(&msg).await } + }); } - async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { + async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { - let value = self.handle_read_entry(&key, &sort_key)?; + let value = self.handle_read_entry(key, sort_key)?; Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.handle_read_range(&key, &begin_sort_key, &filter, limit)?; + let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { @@ -381,7 +389,7 @@ where TableRPC::SyncRPC(rpc) => { let syncer = self.syncer.load_full().unwrap(); let response = syncer - .handle_rpc(&rpc, self.system.background.stop_signal.clone()) + .handle_rpc(rpc, self.system.background.stop_signal.clone()) .await?; Ok(TableRPC::SyncRPC(response)) } @@ -433,14 +441,11 @@ where Ok(ret) } - pub async fn handle_update( - self: &Arc<Self>, - mut entries: Vec<Arc<ByteBuf>>, - ) -> Result<(), Error> { + pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { let syncer = self.syncer.load_full().unwrap(); let mut epidemic_propagate = vec![]; - for update_bytes in entries.drain(..) { + for update_bytes in entries.iter() { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); |