use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; use async_trait::async_trait; use log::*; use futures::io::{AsyncReadExt, AsyncWriteExt}; use kuska_handshake::async_std::{handshake_server, BoxStream}; use tokio::net::TcpStream; use tokio::select; use tokio::sync::{mpsc, watch}; use tokio_util::compat::*; #[cfg(feature = "telemetry")] use opentelemetry::{ trace::{FutureExt, Span, SpanKind, TraceContextExt, TraceId, Tracer}, Context, KeyValue, }; #[cfg(feature = "telemetry")] use opentelemetry_contrib::trace::propagator::binary::*; #[cfg(feature = "telemetry")] use rand::{thread_rng, Rng}; use crate::error::*; use crate::message::*; use crate::netapp::*; use crate::recv::*; use crate::send::*; use crate::stream::*; use crate::util::*; // The client and server connection structs (client.rs and server.rs) // build upon the chunking mechanism which is exclusively contained // in proto.rs. // Here, we just care about sending big messages without size limit. // The format of these messages is described below. // Chunking happens independently. // Request message format (client -> server): // - u8 priority // - u8 path length // - [u8; path length] path // - [u8; *] data // Response message format (server -> client): // - u8 response code // - [u8; *] response pub(crate) struct ServerConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, netapp: Arc, resp_send: ArcSwapOption>, running_handlers: Mutex>>, } impl ServerConn { pub(crate) async fn run( netapp: Arc, socket: TcpStream, must_exit: watch::Receiver, ) -> Result<(), Error> { let remote_addr = socket.peer_addr()?; let mut socket = socket.compat(); // Do handshake to authenticate client let handshake = handshake_server( &mut socket, netapp.netid.clone(), netapp.id, netapp.privkey.clone(), ) .await?; let peer_id = handshake.peer_pk; debug!( "Handshake complete (server) with {}@{}", hex::encode(peer_id), remote_addr ); // Create BoxStream layer that encodes content let (read, write) = socket.split(); let (read, mut write) = BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); // Before doing anything, send version tag, so that client // can check and disconnect if version is wrong write.write_all(&netapp.version_tag[..]).await?; write.flush().await?; // Build and launch stuff that handles requests server-side let (resp_send, resp_recv) = mpsc::unbounded_channel(); let conn = Arc::new(ServerConn { netapp: netapp.clone(), remote_addr, peer_id, resp_send: ArcSwapOption::new(Some(Arc::new(resp_send))), running_handlers: Mutex::new(HashMap::new()), }); netapp.connected_as_server(peer_id, conn.clone()); let debug_name = format!("SRV {}", hex::encode(&peer_id[..8])); let debug_name_2 = debug_name.clone(); let conn2 = conn.clone(); let recv_future = tokio::spawn(async move { select! { r = conn2.recv_loop(read, debug_name_2) => r, _ = await_exit(must_exit) => Ok(()) } }); let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write, debug_name)); recv_future.await.log_err("ServerConn recv_loop"); conn.resp_send.store(None); send_future.await.log_err("ServerConn send_loop"); netapp.disconnected_as_server(&peer_id, conn); Ok(()) } async fn recv_handler_aux(self: &Arc, req_enc: ReqEnc) -> Result { let path = String::from_utf8(req_enc.path.to_vec())?; let handler_opt = { let endpoints = self.netapp.endpoints.read().unwrap(); endpoints.get(&path).map(|e| e.clone_endpoint()) }; if let Some(handler) = handler_opt { cfg_if::cfg_if! { if #[cfg(feature = "telemetry")] { let tracer = opentelemetry::global::tracer("netapp"); let mut span = if !req_enc.telemetry_id.is_empty() { let propagator = BinaryPropagator::new(); let context = propagator.deserialize_from_bytes(req_enc.telemetry_id.to_vec()); let context = Context::new().with_remote_span_context(context); tracer.span_builder(format!(">> RPC {}", path)) .with_kind(SpanKind::Server) .start_with_context(&tracer, &context) } else { let mut rng = thread_rng(); let trace_id = TraceId::from_bytes(rng.gen()); tracer .span_builder(format!(">> RPC {}", path)) .with_kind(SpanKind::Server) .with_trace_id(trace_id) .start(&tracer) }; span.set_attribute(KeyValue::new("path", path.to_string())); span.set_attribute(KeyValue::new("len_query_msg", req_enc.msg.len() as i64)); handler.handle(req_enc, self.peer_id) .with_context(Context::current_with_span(span)) .await } else { handler.handle(req_enc, self.peer_id).await } } } else { Err(Error::NoHandler) } } } impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream) { let resp_send = match self.resp_send.load_full() { Some(c) => c, None => return, }; let mut rh = self.running_handlers.lock().unwrap(); let self2 = self.clone(); let jh = tokio::spawn(async move { debug!("server: recv_handler got {}", id); let (prio, resp_enc_result) = match ReqEnc::decode(stream).await { Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await), Err(e) => (PRIO_HIGH, Err(e)), }; debug!("server: sending response to {}", id); let (resp_stream, resp_order) = RespEnc::encode(resp_enc_result); resp_send .send(SendItem::Stream(id, prio, resp_order, resp_stream)) .log_err("ServerConn recv_handler send resp bytes"); self2.running_handlers.lock().unwrap().remove(&id); }); rh.insert(id, jh); } fn cancel_handler(self: &Arc, id: RequestID) { trace!("received cancel for request {}", id); // If the handler is still running, abort it now if let Some(jh) = self.running_handlers.lock().unwrap().remove(&id) { jh.abort(); } // Inform the response sender that we don't need to send the response if let Some(resp_send) = self.resp_send.load_full() { let _ = resp_send.send(SendItem::Cancel(id)); } } }