diff options
Diffstat (limited to 'src/net')
-rw-r--r-- | src/net/Cargo.toml | 1 | ||||
-rw-r--r-- | src/net/client.rs | 2 | ||||
-rw-r--r-- | src/net/endpoint.rs | 32 | ||||
-rw-r--r-- | src/net/error.rs | 2 | ||||
-rw-r--r-- | src/net/message.rs | 4 | ||||
-rw-r--r-- | src/net/netapp.rs | 6 | ||||
-rw-r--r-- | src/net/peering.rs | 5 | ||||
-rw-r--r-- | src/net/recv.rs | 2 | ||||
-rw-r--r-- | src/net/send.rs | 8 | ||||
-rw-r--r-- | src/net/server.rs | 2 | ||||
-rw-r--r-- | src/net/stream.rs | 2 | ||||
-rw-r--r-- | src/net/test.rs | 2 | ||||
-rw-r--r-- | src/net/util.rs | 2 |
13 files changed, 27 insertions, 43 deletions
diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml index 686aaaea..c0b47a6e 100644 --- a/src/net/Cargo.toml +++ b/src/net/Cargo.toml @@ -30,7 +30,6 @@ rand.workspace = true log.workspace = true arc-swap.workspace = true -async-trait.workspace = true err-derive.workspace = true bytes.workspace = true cfg-if.workspace = true diff --git a/src/net/client.rs b/src/net/client.rs index 607dd173..20e1dacd 100644 --- a/src/net/client.rs +++ b/src/net/client.rs @@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex}; use std::task::Poll; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use bytes::Bytes; use log::{debug, error, trace}; @@ -220,7 +219,6 @@ impl ClientConn { impl SendLoop for ClientConn {} -#[async_trait] impl RecvLoop for ClientConn { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { trace!("ClientConn recv_handler {}", id); diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs index 3cafafeb..d46acc42 100644 --- a/src/net/endpoint.rs +++ b/src/net/endpoint.rs @@ -1,8 +1,9 @@ +use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; use arc_swap::ArcSwapOption; -use async_trait::async_trait; +use futures::future::{BoxFuture, FutureExt}; use crate::error::Error; use crate::message::*; @@ -14,19 +15,17 @@ use crate::netapp::*; /// attached to the response.. /// /// The handler object should be in an Arc, see `Endpoint::set_handler` -#[async_trait] pub trait StreamingEndpointHandler<M>: Send + Sync where M: Message, { - async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>; + fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> impl Future<Output = Resp<M>> + Send; } /// If one simply wants to use an endpoint in a client fashion, /// without locally serving requests to that endpoint, /// use the unit type `()` as the handler type: /// it will panic if it is ever made to handle request. -#[async_trait] impl<M: Message> EndpointHandler<M> for () { async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response { panic!("This endpoint should not have a local handler."); @@ -38,15 +37,13 @@ impl<M: Message> EndpointHandler<M> for () { /// This trait should be implemented by an object of your application /// that can handle a message of type `M`, in the cases where it doesn't /// care about attached stream in the request nor in the response. -#[async_trait] pub trait EndpointHandler<M>: Send + Sync where M: Message, { - async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response; + fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> impl Future<Output = M::Response> + Send; } -#[async_trait] impl<T, M> StreamingEndpointHandler<M> for T where T: EndpointHandler<M>, @@ -161,9 +158,8 @@ where pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>; -#[async_trait] pub(crate) trait GenericEndpoint { - async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>; + fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>>; fn drop_handler(&self); fn clone_endpoint(&self) -> DynEndpoint; } @@ -174,21 +170,23 @@ where M: Message, H: StreamingEndpointHandler<M>; -#[async_trait] impl<M, H> GenericEndpoint for EndpointArc<M, H> where M: Message, H: StreamingEndpointHandler<M> + 'static, { - async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> { - match self.0.handler.load_full() { - None => Err(Error::NoHandler), - Some(h) => { - let req = Req::from_enc(req_enc)?; - let res = h.handle(req, from).await; - Ok(res.into_enc()?) + fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>> { + async move { + match self.0.handler.load_full() { + None => Err(Error::NoHandler), + Some(h) => { + let req = Req::from_enc(req_enc)?; + let res = h.handle(req, from).await; + Ok(res.into_enc()?) + } } } + .boxed() } fn drop_handler(&self) { diff --git a/src/net/error.rs b/src/net/error.rs index c0aeeacc..cddb1eaa 100644 --- a/src/net/error.rs +++ b/src/net/error.rs @@ -59,7 +59,7 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error { } } -/// Ths trait adds a `.log_err()` method on `Result<(), E>` types, +/// The trait adds a `.log_err()` method on `Result<(), E>` types, /// which dismisses the error by logging it to stderr. pub trait LogError { fn log_err(self, msg: &'static str); diff --git a/src/net/message.rs b/src/net/message.rs index af98ca12..59afb058 100644 --- a/src/net/message.rs +++ b/src/net/message.rs @@ -18,7 +18,7 @@ use crate::util::*; /// in the send queue of the client, and their responses in the send queue of the /// server. Lower values mean higher priority. /// -/// This mechanism is usefull for messages bigger than the maximum chunk size +/// This mechanism is useful for messages bigger than the maximum chunk size /// (set at `0x4000` bytes), such as large file transfers. /// In such case, all of the messages in the send queue with the highest priority /// will take turns to send individual chunks, in a round-robin fashion. @@ -102,7 +102,7 @@ pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static /// The Req<M> is a helper object used to create requests and attach them /// a stream of data. If the stream is a fixed Bytes and not a ByteStream, -/// Req<M> is cheaply clonable to allow the request to be sent to different +/// Req<M> is cheaply cloneable to allow the request to be sent to different /// peers (Clone will panic if the stream is a ByteStream). pub struct Req<M: Message> { pub(crate) msg: Arc<M>, diff --git a/src/net/netapp.rs b/src/net/netapp.rs index f1e9f1ae..36c6fc88 100644 --- a/src/net/netapp.rs +++ b/src/net/netapp.rs @@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock}; use log::{debug, error, info, trace, warn}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::auth; @@ -41,7 +40,7 @@ pub(crate) type VersionTag = [u8; 16]; pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6772676e65740010; // grgnet 0x0010 (1.0) /// HelloMessage is sent by the client on a Netapp connection to indicate -/// that they are also a server and ready to recieve incoming connections +/// that they are also a server and ready to receive incoming connections /// at the specified address and port. If the client doesn't know their /// public address, they don't need to specify it and we look at the /// remote address of the socket is used instead. @@ -290,7 +289,7 @@ impl NetApp { /// Attempt to connect to a peer, given by its ip:port and its public key. /// The public key will be checked during the secret handshake process. /// This function returns once the connection has been established and a - /// successfull handshake was made. At this point we can send messages to + /// successful handshake was made. At this point we can send messages to /// the other node with `Netapp::request` pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> { // Don't connect to ourself, we don't care @@ -457,7 +456,6 @@ impl NetApp { } } -#[async_trait] impl EndpointHandler<HelloMessage> for NetApp { async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) { debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg); diff --git a/src/net/peering.rs b/src/net/peering.rs index 168162d9..08378a08 100644 --- a/src/net/peering.rs +++ b/src/net/peering.rs @@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use arc_swap::ArcSwap; -use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; @@ -138,7 +137,7 @@ pub enum PeerConnState { /// A connection tentative is in progress (the nth, where n is the value stored) Trying(usize), - /// We abandonned trying to connect to this peer (too many failed attempts) + /// We abandoned trying to connect to this peer (too many failed attempts) Abandonned, } @@ -592,7 +591,6 @@ impl PeeringManager { } } -#[async_trait] impl EndpointHandler<PingMessage> for PeeringManager { async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { @@ -604,7 +602,6 @@ impl EndpointHandler<PingMessage> for PeeringManager { } } -#[async_trait] impl EndpointHandler<PeerListMessage> for PeeringManager { async fn handle( self: &Arc<Self>, diff --git a/src/net/recv.rs b/src/net/recv.rs index 0de7bef2..35a6d71a 100644 --- a/src/net/recv.rs +++ b/src/net/recv.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; use bytes::Bytes; use log::*; @@ -50,7 +49,6 @@ impl Drop for Sender { /// according to the protocol defined above: chunks of message in progress of being /// received are stored in a buffer, and when the last chunk of a message is received, /// the full message is passed to the receive handler. -#[async_trait] pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream); fn cancel_handler(self: &Arc<Self>, _id: RequestID) {} diff --git a/src/net/send.rs b/src/net/send.rs index c60fc6b2..6f1ac02c 100644 --- a/src/net/send.rs +++ b/src/net/send.rs @@ -3,7 +3,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use log::*; @@ -28,7 +27,7 @@ use crate::stream::*; // - if error: // - u8: error kind, encoded using error::io_errorkind_to_u8 // - rest: error message -// - absent for cancel messag +// - absent for cancel message pub(crate) type RequestID = u32; pub(crate) type ChunkLength = u16; @@ -217,7 +216,7 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> { enum DataFrame { /// a fixed size buffer containing some data + a boolean indicating whether - /// there may be more data comming from this stream. Can be used for some + /// there may be more data coming from this stream. Can be used for some /// optimization. It's an error to set it to false if there is more data, but it is correct /// (albeit sub-optimal) to set it to true if there is nothing coming after Data(Bytes, bool), @@ -273,7 +272,6 @@ impl DataFrame { /// /// The `.send_loop()` exits when the sending end of the channel is closed, /// or if there is an error at any time writing to the async writer. -#[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop<W>( self: Arc<Self>, @@ -310,7 +308,7 @@ pub(crate) trait SendLoop: Sync { // recv_fut is cancellation-safe according to tokio doc, // send_fut is cancellation-safe as implemented above? tokio::select! { - biased; // always read incomming channel first if it has data + biased; // always read incoming channel first if it has data sth = recv_fut => { match sth { Some(SendItem::Stream(id, prio, order_tag, data)) => { diff --git a/src/net/server.rs b/src/net/server.rs index 36dccb2f..fb6c6366 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use log::*; use futures::io::{AsyncReadExt, AsyncWriteExt}; @@ -174,7 +173,6 @@ impl ServerConn { impl SendLoop for ServerConn {} -#[async_trait] impl RecvLoop for ServerConn { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { let resp_send = match self.resp_send.load_full() { diff --git a/src/net/stream.rs b/src/net/stream.rs index 3ac6896d..c973f9a7 100644 --- a/src/net/stream.rs +++ b/src/net/stream.rs @@ -16,7 +16,7 @@ use crate::bytes_buf::BytesBuf; /// /// Items sent in the ByteStream may be errors of type `std::io::Error`. /// An error indicates the end of the ByteStream: a reader should no longer read -/// after recieving an error, and a writer should stop writing after sending an error. +/// after receiving an error, and a writer should stop writing after sending an error. pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>; /// A packet sent in a ByteStream, which may contain either diff --git a/src/net/test.rs b/src/net/test.rs index 5a3f236d..3cf446bd 100644 --- a/src/net/test.rs +++ b/src/net/test.rs @@ -66,7 +66,7 @@ async fn run_test_inner(port_base: u16) { println!("A pl2: {:?}", pl2); assert_eq!(pl2.len(), 2); - // Connect third ndoe and check it peers with everyone + // Connect third node and check it peers with everyone let (thread3, _netapp3, peering3) = run_netapp(netid, pk3, sk3, addr3, vec![(pk2, addr2)], stop_rx.clone()); tokio::time::sleep(Duration::from_secs(3)).await; diff --git a/src/net/util.rs b/src/net/util.rs index 56230b73..35a3be1e 100644 --- a/src/net/util.rs +++ b/src/net/util.rs @@ -25,7 +25,7 @@ where /// This async function returns only when a true signal was received /// from a watcher that tells us when to exit. /// -/// Usefull in a select statement to interrupt another +/// Useful in a select statement to interrupt another /// future: /// ```ignore /// select!( |