aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc_server.rs')
-rw-r--r--src/rpc_server.rs29
1 files changed, 21 insertions, 8 deletions
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index ddfc5e04..f54b5099 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -56,19 +56,32 @@ async fn handler(
);
let sys = garage.system.clone();
- let resp = err_to_msg(match &msg {
- Message::Ping(ping) => sys.handle_ping(&addr, ping).await,
+ let resp = err_to_msg(match msg {
+ Message::Ping(ping) => sys.handle_ping(&addr, &ping).await,
Message::PullStatus => sys.handle_pull_status(),
Message::PullConfig => sys.handle_pull_config(),
- Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
- Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
-
- Message::PutBlock(m) => garage.block_manager.write_block(&m.hash, &m.data).await,
+ Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(&adv).await,
+ Message::AdvertiseConfig(adv) => sys.handle_advertise_config(&adv).await,
+
+ Message::PutBlock(m) => {
+ // A RPC can be interrupted in the middle, however we don't want to write partial blocks,
+ // which might happen if the write_block() future is cancelled in the middle.
+ // To solve this, the write itself is in a spawned task that has its own separate lifetime,
+ // and the request handler simply sits there waiting for the task to finish.
+ // (if it's cancelled, that's not an issue)
+ // (TODO FIXME except if garage happens to shut down at that point)
+ let write_fut = async move {
+ garage.block_manager.write_block(&m.hash, &m.data).await
+ };
+ tokio::spawn(write_fut).await?
+ }
Message::GetBlock(h) => garage.block_manager.read_block(&h).await,
Message::TableRPC(table, msg) => {
- if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) {
+ // For now, table RPCs use transactions that are not async so even if the future
+ // is canceled, the db should be in a consistent state.
+ if let Some(rpc_handler) = garage.table_rpc_handlers.get(&table) {
rpc_handler
.handle(&msg[..])
.await
@@ -90,7 +103,7 @@ pub async fn run_rpc_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), Error> {
- let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into();
+ let bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], garage.system.config.rpc_port).into();
let service = make_service_fn(|conn: &AddrStream| {
let client_addr = conn.remote_addr();