diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-12 17:59:46 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-12 17:59:46 +0200 |
commit | f87dbe73dc12f2d6eb13850a3bc4b012aadd3c9b (patch) | |
tree | 5407c8eab331d066e66f5193d51f6fd66bedb9bb /src/conn.rs | |
parent | 040231d554b74e981644e606c096ced6fc36a2ad (diff) | |
download | netapp-f87dbe73dc12f2d6eb13850a3bc4b012aadd3c9b.tar.gz netapp-f87dbe73dc12f2d6eb13850a3bc4b012aadd3c9b.zip |
WIP v0.3.0 with changed API
Diffstat (limited to 'src/conn.rs')
-rw-r--r-- | src/conn.rs | 90 |
1 files changed, 69 insertions, 21 deletions
diff --git a/src/conn.rs b/src/conn.rs index c2c9c8b..64318dc 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::atomic::{self, AtomicBool, AtomicU16}; +use std::sync::atomic::{self, AtomicBool, AtomicU32}; use std::sync::{Arc, Mutex}; use bytes::Bytes; @@ -16,12 +16,22 @@ use async_trait::async_trait; use kuska_handshake::async_std::{handshake_client, handshake_server, BoxStream}; +use crate::endpoint::*; use crate::error::*; -use crate::message::*; use crate::netapp::*; use crate::proto::*; use crate::util::*; +// Request message format (client -> server): +// - u8 priority +// - u8 path length +// - [u8; path length] path +// - [u8; *] data + +// Response message format (server -> client): +// - u8 response code +// - [u8; *] response + pub(crate) struct ServerConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, @@ -99,30 +109,60 @@ impl ServerConn { pub fn close(&self) { self.close_send.send(true).unwrap(); } + + async fn recv_handler_aux(self: &Arc<Self>, bytes: &[u8]) -> Result<Vec<u8>, Error> { + if bytes.len() < 2 { + return Err(Error::Message("Invalid protocol message".into())); + } + + // byte 0 is the request priority, we don't care here + let path_length = bytes[1] as usize; + if bytes.len() < 2 + path_length { + return Err(Error::Message("Invalid protocol message".into())); + } + + let path = &bytes[2..2 + path_length]; + let path = String::from_utf8(path.to_vec())?; + let data = &bytes[2 + path_length..]; + + let handler_opt = { + let endpoints = self.netapp.endpoints.read().unwrap(); + endpoints.get(&path).map(|e| e.clone_endpoint()) + }; + + if let Some(handler) = handler_opt { + handler.handle(data, self.peer_id).await + } else { + Err(Error::NoHandler) + } + } } impl SendLoop for ServerConn {} #[async_trait] impl RecvLoop for ServerConn { - async fn recv_handler(self: Arc<Self>, id: u16, bytes: Vec<u8>) { + async fn recv_handler(self: Arc<Self>, id: RequestID, bytes: Vec<u8>) { trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); - let bytes: Bytes = bytes.into(); + let resp = self.recv_handler_aux(&bytes[..]).await; let prio = bytes[0]; - let mut kind_bytes = [0u8; 4]; - kind_bytes.copy_from_slice(&bytes[1..5]); - let kind = u32::from_be_bytes(kind_bytes); - - if let Some(handler) = self.netapp.msg_handlers.load().get(&kind) { - let net_handler = &handler.net_handler; - let resp = net_handler(self.peer_id, bytes.slice(5..)).await; - self.resp_send - .send(Some((id, prio, resp))) - .log_err("ServerConn recv_handler send resp"); + let mut resp_bytes = vec![]; + match resp { + Ok(rb) => { + resp_bytes.push(0u8); + resp_bytes.extend(&rb[..]); + } + Err(e) => { + resp_bytes.push(e.code()); + } } + + self.resp_send + .send(Some((id, prio, resp_bytes))) + .log_err("ServerConn recv_handler send resp"); } } pub(crate) struct ClientConn { @@ -131,7 +171,7 @@ pub(crate) struct ClientConn { query_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, - next_query_number: AtomicU16, + next_query_number: AtomicU32, inflight: Mutex<HashMap<RequestID, oneshot::Sender<Vec<u8>>>>, must_exit: AtomicBool, stop_recv_loop: watch::Sender<bool>, @@ -173,7 +213,7 @@ impl ClientConn { let conn = Arc::new(ClientConn { remote_addr, peer_id, - next_query_number: AtomicU16::from(0u16), + next_query_number: AtomicU32::from(RequestID::default()), query_send, inflight: Mutex::new(HashMap::new()), must_exit: AtomicBool::new(false), @@ -212,9 +252,10 @@ impl ClientConn { } } - pub(crate) async fn request<T>( + pub(crate) async fn call<T>( self: Arc<Self>, rq: T, + path: &str, prio: RequestPriority, ) -> Result<<T as Message>::Response, Error> where @@ -222,9 +263,9 @@ impl ClientConn { { let id = self .next_query_number - .fetch_add(1u16, atomic::Ordering::Relaxed); - let mut bytes = vec![prio]; - bytes.extend_from_slice(&u32::to_be_bytes(T::KIND)[..]); + .fetch_add(1, atomic::Ordering::Relaxed); + let mut bytes = vec![prio, path.as_bytes().len() as u8]; + bytes.extend_from_slice(path.as_bytes()); bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]); let (resp_send, resp_recv) = oneshot::channel(); @@ -243,8 +284,15 @@ impl ClientConn { let resp = resp_recv.await?; - rmp_serde::decode::from_read_ref::<_, Result<<T as Message>::Response, String>>(&resp[..])? + let code = resp[0]; + if code == 0 { + rmp_serde::decode::from_read_ref::<_, Result<<T as Message>::Response, String>>( + &resp[1..], + )? .map_err(Error::Remote) + } else { + Err(Error::Remote(format!("Remote error code {}", code))) + } } } |