aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_server.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-12 23:05:53 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-12 23:05:53 +0200
commit43ce5e4ab4ebe317bb9263de5d56b90dc68ea7eb (patch)
treed0757a5907dee585948b5a0a75ed689b608338c8 /src/rpc_server.rs
parent2bea76ce16c5dfc3ddead65adcd7b0bed2cf9530 (diff)
downloadgarage-43ce5e4ab4ebe317bb9263de5d56b90dc68ea7eb.tar.gz
garage-43ce5e4ab4ebe317bb9263de5d56b90dc68ea7eb.zip
Fix table RPC to not be interruptible
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r--src/rpc_server.rs26
1 files changed, 15 insertions, 11 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 17da6f86..b75d67fd 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -76,22 +76,26 @@ async fn handler(
// and the request handler simply sits there waiting for the task to finish.
// (if it's cancelled, that's not an issue)
// (TODO FIXME except if garage happens to shut down at that point)
- let write_fut = async move { garage.block_manager.write_block(&m.hash, &m.data).await };
+ let write_fut = async move {
+ garage.block_manager.write_block(&m.hash, &m.data).await
+ };
tokio::spawn(write_fut).await?
}
Message::GetBlock(h) => garage.block_manager.read_block(&h).await,
Message::TableRPC(table, msg) => {
- // For now, table RPCs use transactions that are not async so even if the future
- // is canceled, the db should be in a consistent state.
- if let Some(rpc_handler) = garage.table_rpc_handlers.get(&table) {
- rpc_handler
- .handle(&msg[..])
- .await
- .map(|rep| Message::TableRPC(table.to_string(), rep))
- } else {
- Ok(Message::Error(format!("Unknown table: {}", table)))
- }
+ // Same trick for table RPCs than for PutBlock
+ let op_fut = async move {
+ if let Some(rpc_handler) = garage.table_rpc_handlers.get(&table) {
+ rpc_handler
+ .handle(&msg[..])
+ .await
+ .map(|rep| Message::TableRPC(table.to_string(), rep))
+ } else {
+ Ok(Message::Error(format!("Unknown table: {}", table)))
+ }
+ };
+ tokio::spawn(op_fut).await?
}
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),