diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/conn.rs | 18 | ||||
-rw-r--r-- | src/error.rs | 4 | ||||
-rw-r--r-- | src/netapp.rs | 34 | ||||
-rw-r--r-- | src/peering/fullmesh.rs | 18 | ||||
-rw-r--r-- | src/proto.rs | 6 |
5 files changed, 42 insertions, 38 deletions
diff --git a/src/conn.rs b/src/conn.rs index 6c5f38a..474032d 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -39,11 +39,11 @@ impl ServerConn { let handshake = handshake_server( &mut asyncstd_socket, netapp.netid.clone(), - netapp.id.clone(), + netapp.id, netapp.privkey.clone(), ) .await?; - let peer_id = handshake.peer_pk.clone(); + let peer_id = handshake.peer_pk; let tokio_socket = asyncstd_socket.into_inner(); let remote_addr = tokio_socket.peer_addr()?; @@ -69,12 +69,12 @@ impl ServerConn { let conn = Arc::new(ServerConn { netapp: netapp.clone(), remote_addr, - peer_id: peer_id.clone(), + peer_id, resp_send, close_send, }); - netapp.connected_as_server(peer_id.clone(), conn.clone()); + netapp.connected_as_server(peer_id, conn.clone()); let conn2 = conn.clone(); let conn3 = conn.clone(); @@ -123,7 +123,7 @@ impl RecvLoop for ServerConn { if let Some(handler) = self.netapp.msg_handlers.load().get(&kind) { let net_handler = &handler.net_handler; - let resp = net_handler(self.peer_id.clone(), bytes.slice(5..)).await; + let resp = net_handler(self.peer_id, bytes.slice(5..)).await; self.resp_send .send(Some((id, prio, resp))) .log_err("ServerConn recv_handler send resp"); @@ -153,9 +153,9 @@ impl ClientConn { let handshake = handshake_client( &mut asyncstd_socket, netapp.netid.clone(), - netapp.id.clone(), + netapp.id, netapp.privkey.clone(), - peer_id.clone(), + peer_id, ) .await?; @@ -182,7 +182,7 @@ impl ClientConn { let conn = Arc::new(ClientConn { remote_addr, - peer_id: peer_id.clone(), + peer_id, next_query_number: AtomicU16::from(0u16), query_send, inflight: Mutex::new(HashMap::new()), @@ -190,7 +190,7 @@ impl ClientConn { stop_recv_loop, }); - netapp.connected_as_client(peer_id.clone(), conn.clone()); + netapp.connected_as_client(peer_id, conn.clone()); tokio::spawn(async move { let conn2 = conn.clone(); diff --git a/src/error.rs b/src/error.rs index 44a3d50..469670a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -31,13 +31,13 @@ pub enum Error { impl<T> From<tokio::sync::watch::error::SendError<T>> for Error { fn from(_e: tokio::sync::watch::error::SendError<T>) -> Error { - Error::Message(format!("Watch send error")) + Error::Message("Watch send error".into()) } } impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error { fn from(_e: tokio::sync::mpsc::error::SendError<T>) -> Error { - Error::Message(format!("MPSC send error")) + Error::Message("MPSC send error".into()) } } diff --git a/src/netapp.rs b/src/netapp.rs index 5847f98..86f14fe 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -24,12 +24,16 @@ use crate::util::*; type DynMsg = Box<dyn Any + Send + Sync + 'static>; +type OnConnectHandler = Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>; +type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>; + +pub(crate) type LocalHandler = Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>; +pub(crate) type NetHandler = Box< + dyn Fn(NodeID, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> + Sync + Send>; + pub(crate) struct Handler { - pub(crate) local_handler: - Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>, - pub(crate) net_handler: Box< - dyn Fn(NodeID, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> + Sync + Send, - >, + pub(crate) local_handler: LocalHandler, + pub(crate) net_handler: NetHandler, } /// NetApp is the main class that handles incoming and outgoing connections. @@ -38,9 +42,9 @@ pub(crate) struct Handler { /// an outgoing connection, or to ourself. On the server side, these messages are /// processed by the handlers that have been defined using `add_msg_handler()`. /// -/// NetApp can be used in a stand-alone fashion or together with a peering strategy. -/// If using it alone, you will want to set `on_connect` and `on_disconnect` events -/// in order to manage information about the current peer list. +/// NetApp can be used in a stand-alone fashion or together with a peering strategy. +/// If using it alone, you will want to set `on_connect` and `on_disconnect` events +/// in order to manage information about the current peer list. /// /// It is generally not necessary to use NetApp stand-alone, as the provided full mesh /// and RPS peering strategies take care of the most common use cases. @@ -58,8 +62,8 @@ pub struct NetApp { client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>, pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>, - on_connected_handler: ArcSwapOption<Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>>, - on_disconnected_handler: ArcSwapOption<Box<dyn Fn(NodeID, bool) + Send + Sync>>, + on_connected_handler: ArcSwapOption<OnConnectHandler>, + on_disconnected_handler: ArcSwapOption<OnDisconnectHandler>, } struct ListenParams { @@ -90,7 +94,7 @@ where hex::encode(remote), (end_time - begin_time).as_millis() ); - rmp_to_vec_all_named(&res).unwrap_or(vec![]) + rmp_to_vec_all_named(&res).unwrap_or_default() } async fn local_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, msg: DynMsg) -> DynMsg @@ -128,7 +132,7 @@ impl NetApp { let netapp2 = netapp.clone(); netapp.add_msg_handler::<HelloMessage, _, _>(move |from: NodeID, msg: HelloMessage| { netapp2.handle_hello_message(from, msg); - async { () } + async { } }); netapp @@ -175,7 +179,7 @@ impl NetApp { fun }); - let self_id = self.id.clone(); + let self_id = self.id; let local_handler = Box::new(move |msg: DynMsg| { let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> = Box::pin(local_handler_aux(handler.clone(), self_id, msg)); @@ -248,7 +252,7 @@ impl NetApp { let socket = TcpStream::connect(ip).await?; info!("Connected to {}, negotiating handshake...", ip); - ClientConn::init(self, socket, id.clone()).await?; + ClientConn::init(self, socket, id).await?; Ok(()) } @@ -315,7 +319,7 @@ impl NetApp { fn handle_hello_message(&self, id: NodeID, msg: HelloMessage) { if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(c) = self.server_conns.read().unwrap().get(&id) { - let remote_ip = msg.server_addr.unwrap_or(c.remote_addr.ip()); + let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip()); let remote_addr = SocketAddr::new(remote_ip, msg.server_port); h(id, remote_addr, true); } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 9b55180..b0bbe45 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -103,7 +103,7 @@ impl KnownHosts { let mut list = Vec::with_capacity(input.len()); for (id, peer) in input.iter() { if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { - list.push((id.clone(), peer.addr)); + list.push((*id, peer.addr)); } } list @@ -134,7 +134,7 @@ impl FullMeshPeeringStrategy { known_hosts.list.insert( id, PeerInfo { - addr: addr, + addr, state: PeerConnState::Waiting(0, Instant::now()), last_seen: None, ping: VecDeque::new(), @@ -201,12 +201,12 @@ impl FullMeshPeeringStrategy { Some(t) => Instant::now() - t > PING_INTERVAL, }; if must_ping { - to_ping.push(id.clone()); + to_ping.push(*id); } } PeerConnState::Waiting(_, t) => { if Instant::now() >= t { - to_retry.push(id.clone()); + to_retry.push(*id); } } _ => (), @@ -234,7 +234,7 @@ impl FullMeshPeeringStrategy { i + 1 ); h.state = PeerConnState::Trying(i); - tokio::spawn(self.clone().try_connect(id, h.addr.clone())); + tokio::spawn(self.clone().try_connect(id, h.addr)); } } } @@ -307,7 +307,7 @@ impl FullMeshPeeringStrategy { } async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) { - let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await; + let conn_result = self.netapp.clone().try_connect(addr, id).await; if let Err(e) = conn_result { warn!("Error connecting to {}: {}", hex::encode(id), e); let mut known_hosts = self.known_hosts.write().unwrap(); @@ -362,9 +362,9 @@ impl FullMeshPeeringStrategy { for (id, info) in known_hosts.list.iter() { let mut pings = info.ping.iter().cloned().collect::<Vec<_>>(); pings.sort(); - if pings.len() > 0 { + if !pings.is_empty() { ret.push(PeerInfoPub { - id: id.clone(), + id: *id, addr: info.addr, state: info.state, last_seen: info.last_seen, @@ -379,7 +379,7 @@ impl FullMeshPeeringStrategy { }); } else { ret.push(PeerInfoPub { - id: id.clone(), + id: *id, addr: info.addr, state: info.state, last_seen: info.last_seen, diff --git a/src/proto.rs b/src/proto.rs index bfef8e7..d8f6289 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -65,7 +65,7 @@ impl SendQueue { let mut items_at_prio = self .items .remove(&prio) - .unwrap_or(VecDeque::with_capacity(4)); + .unwrap_or_else(|| VecDeque::with_capacity(4)); items_at_prio.push_back(item); self.items.insert(prio, items_at_prio); } @@ -143,7 +143,7 @@ pub(crate) trait SendLoop: Sync { let sth = msg_recv .recv() .await - .ok_or(Error::Message("Connection closed.".into()))?; + .ok_or_else(|| Error::Message("Connection closed.".into()))?; if let Some((id, prio, data)) = sth { trace!("send_loop: got {}, {} bytes", id, data.len()); sending.push(SendQueueItem { @@ -190,7 +190,7 @@ pub(crate) trait RecvLoop: Sync + 'static { read.read_exact(&mut next_slice[..]).await?; trace!("recv_loop: read {} bytes", next_slice.len()); - let mut msg_bytes = receiving.remove(&id).unwrap_or(vec![]); + let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default(); msg_bytes.extend_from_slice(&next_slice[..]); if has_cont { |