aboutsummaryrefslogtreecommitdiff
path: root/src/net/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/client.rs')
-rw-r--r--src/net/client.rs292
1 files changed, 292 insertions, 0 deletions
diff --git a/src/net/client.rs b/src/net/client.rs
new file mode 100644
index 00000000..607dd173
--- /dev/null
+++ b/src/net/client.rs
@@ -0,0 +1,292 @@
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::pin::Pin;
+use std::sync::atomic::{self, AtomicU32};
+use std::sync::{Arc, Mutex};
+use std::task::Poll;
+
+use arc_swap::ArcSwapOption;
+use async_trait::async_trait;
+use bytes::Bytes;
+use log::{debug, error, trace};
+
+use futures::io::AsyncReadExt;
+use futures::Stream;
+use kuska_handshake::async_std::{handshake_client, BoxStream};
+use tokio::net::TcpStream;
+use tokio::select;
+use tokio::sync::{mpsc, oneshot, watch};
+use tokio_util::compat::*;
+
+#[cfg(feature = "telemetry")]
+use opentelemetry::{
+ trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer},
+ Context, KeyValue,
+};
+#[cfg(feature = "telemetry")]
+use opentelemetry_contrib::trace::propagator::binary::*;
+
+use crate::error::*;
+use crate::message::*;
+use crate::netapp::*;
+use crate::recv::*;
+use crate::send::*;
+use crate::stream::*;
+use crate::util::*;
+
+pub(crate) struct ClientConn {
+ pub(crate) remote_addr: SocketAddr,
+ pub(crate) peer_id: NodeID,
+
+ query_send: ArcSwapOption<mpsc::UnboundedSender<SendItem>>,
+
+ next_query_number: AtomicU32,
+ inflight: Mutex<HashMap<RequestID, oneshot::Sender<ByteStream>>>,
+}
+
+impl ClientConn {
+ pub(crate) async fn init(
+ netapp: Arc<NetApp>,
+ socket: TcpStream,
+ peer_id: NodeID,
+ ) -> Result<(), Error> {
+ let remote_addr = socket.peer_addr()?;
+ let mut socket = socket.compat();
+
+ // Do handshake to authenticate and prove our identity to server
+ let handshake = handshake_client(
+ &mut socket,
+ netapp.netid.clone(),
+ netapp.id,
+ netapp.privkey.clone(),
+ peer_id,
+ )
+ .await?;
+
+ debug!(
+ "Handshake complete (client) with {}@{}",
+ hex::encode(peer_id),
+ remote_addr
+ );
+
+ // Create BoxStream layer that encodes content
+ let (read, write) = socket.split();
+ let (mut read, write) =
+ BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
+
+ // Before doing anything, receive version tag and
+ // check they are running the same version as us
+ let mut their_version_tag = VersionTag::default();
+ read.read_exact(&mut their_version_tag[..]).await?;
+ if their_version_tag != netapp.version_tag {
+ let msg = format!(
+ "different version tags: {} (theirs) vs. {} (ours)",
+ hex::encode(their_version_tag),
+ hex::encode(netapp.version_tag)
+ );
+ error!("Cannot connect to {}: {}", hex::encode(&peer_id[..8]), msg);
+ return Err(Error::VersionMismatch(msg));
+ }
+
+ // Build and launch stuff that manages sending requests client-side
+ let (query_send, query_recv) = mpsc::unbounded_channel();
+
+ let (stop_recv_loop, stop_recv_loop_recv) = watch::channel(false);
+
+ let conn = Arc::new(ClientConn {
+ remote_addr,
+ peer_id,
+ next_query_number: AtomicU32::from(RequestID::default()),
+ query_send: ArcSwapOption::new(Some(Arc::new(query_send))),
+ inflight: Mutex::new(HashMap::new()),
+ });
+
+ netapp.connected_as_client(peer_id, conn.clone());
+
+ let debug_name = format!("CLI {}", hex::encode(&peer_id[..8]));
+
+ tokio::spawn(async move {
+ let debug_name_2 = debug_name.clone();
+ let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write, debug_name_2));
+
+ let conn2 = conn.clone();
+ let recv_future = tokio::spawn(async move {
+ select! {
+ r = conn2.recv_loop(read, debug_name) => r,
+ _ = await_exit(stop_recv_loop_recv) => Ok(())
+ }
+ });
+
+ send_future.await.log_err("ClientConn send_loop");
+
+ // FIXME: should do here: wait for inflight requests to all have their response
+ stop_recv_loop
+ .send(true)
+ .log_err("ClientConn send true to stop_recv_loop");
+
+ recv_future.await.log_err("ClientConn recv_loop");
+
+ // Make sure we don't wait on any more requests that won't
+ // have a response
+ conn.inflight.lock().unwrap().clear();
+
+ netapp.disconnected_as_client(&peer_id, conn);
+ });
+
+ Ok(())
+ }
+
+ pub fn close(&self) {
+ self.query_send.store(None);
+ }
+
+ pub(crate) async fn call<T>(
+ self: Arc<Self>,
+ req: Req<T>,
+ path: &str,
+ prio: RequestPriority,
+ ) -> Result<Resp<T>, Error>
+ where
+ T: Message,
+ {
+ let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?;
+
+ let id = self
+ .next_query_number
+ .fetch_add(1, atomic::Ordering::Relaxed);
+
+ cfg_if::cfg_if! {
+ if #[cfg(feature = "telemetry")] {
+ let tracer = opentelemetry::global::tracer("netapp");
+ let mut span = tracer.span_builder(format!("RPC >> {}", path))
+ .with_kind(SpanKind::Client)
+ .start(&tracer);
+ let propagator = BinaryPropagator::new();
+ let telemetry_id: Bytes = propagator.to_bytes(span.span_context()).to_vec().into();
+ } else {
+ let telemetry_id: Bytes = Bytes::new();
+ }
+ };
+
+ // Encode request
+ let req_enc = req.into_enc(prio, path.as_bytes().to_vec().into(), telemetry_id);
+ let req_msg_len = req_enc.msg.len();
+ let (req_stream, req_order) = req_enc.encode();
+
+ // Send request through
+ let (resp_send, resp_recv) = oneshot::channel();
+ let old = self.inflight.lock().unwrap().insert(id, resp_send);
+ if let Some(old_ch) = old {
+ error!(
+ "Too many inflight requests! RequestID collision. Interrupting previous request."
+ );
+ let _ = old_ch.send(Box::pin(futures::stream::once(async move {
+ Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "RequestID collision, too many inflight requests",
+ ))
+ })));
+ }
+
+ debug!(
+ "request: query_send {}, path {}, prio {} (serialized message: {} bytes)",
+ id, path, prio, req_msg_len
+ );
+
+ #[cfg(feature = "telemetry")]
+ span.set_attribute(KeyValue::new("len_query_msg", req_msg_len as i64));
+
+ query_send.send(SendItem::Stream(id, prio, req_order, req_stream))?;
+
+ let canceller = CancelOnDrop::new(id, query_send.as_ref().clone());
+
+ cfg_if::cfg_if! {
+ if #[cfg(feature = "telemetry")] {
+ let stream = resp_recv
+ .with_context(Context::current_with_span(span))
+ .await?;
+ } else {
+ let stream = resp_recv.await?;
+ }
+ }
+
+ let stream = Box::pin(canceller.for_stream(stream));
+
+ let resp_enc = RespEnc::decode(stream).await?;
+ debug!("client: got response to request {} (path {})", id, path);
+ Resp::from_enc(resp_enc)
+ }
+}
+
+impl SendLoop for ClientConn {}
+
+#[async_trait]
+impl RecvLoop for ClientConn {
+ fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
+ trace!("ClientConn recv_handler {}", id);
+
+ let mut inflight = self.inflight.lock().unwrap();
+ if let Some(ch) = inflight.remove(&id) {
+ if ch.send(stream).is_err() {
+ debug!("Could not send request response, probably because request was interrupted. Dropping response.");
+ }
+ } else {
+ debug!("Got unexpected response to request {}, dropping it", id);
+ }
+ }
+}
+
+// ----
+
+struct CancelOnDrop {
+ id: RequestID,
+ query_send: mpsc::UnboundedSender<SendItem>,
+}
+
+impl CancelOnDrop {
+ fn new(id: RequestID, query_send: mpsc::UnboundedSender<SendItem>) -> Self {
+ Self { id, query_send }
+ }
+ fn for_stream(self, stream: ByteStream) -> CancelOnDropStream {
+ CancelOnDropStream {
+ cancel: Some(self),
+ stream,
+ }
+ }
+}
+
+impl Drop for CancelOnDrop {
+ fn drop(&mut self) {
+ trace!("cancelling request {}", self.id);
+ let _ = self.query_send.send(SendItem::Cancel(self.id));
+ }
+}
+
+#[pin_project::pin_project]
+struct CancelOnDropStream {
+ cancel: Option<CancelOnDrop>,
+ #[pin]
+ stream: ByteStream,
+}
+
+impl Stream for CancelOnDropStream {
+ type Item = Packet;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let this = self.project();
+ let res = this.stream.poll_next(cx);
+ if matches!(res, Poll::Ready(None)) {
+ if let Some(c) = this.cancel.take() {
+ std::mem::forget(c)
+ }
+ }
+ res
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}