aboutsummaryrefslogtreecommitdiff
path: root/src/netapp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/netapp.rs')
-rw-r--r--src/netapp.rs172
1 files changed, 87 insertions, 85 deletions
diff --git a/src/netapp.rs b/src/netapp.rs
index 8397be9..9733fb7 100644
--- a/src/netapp.rs
+++ b/src/netapp.rs
@@ -28,9 +28,7 @@ pub(crate) struct Handler {
pub(crate) local_handler:
Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>,
pub(crate) net_handler: Box<
- dyn Fn(ed25519::PublicKey, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>>
- + Sync
- + Send,
+ dyn Fn(NodeID, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> + Sync + Send,
>,
}
@@ -47,30 +45,32 @@ pub(crate) struct Handler {
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
/// and RPS peering strategies take care of the most common use cases.
pub struct NetApp {
- pub listen_addr: SocketAddr,
- pub public_addr: Option<IpAddr>,
+ listen_params: ArcSwapOption<ListenParams>,
+ /// Network secret key
pub netid: auth::Key,
- pub pubkey: ed25519::PublicKey,
+ /// Our peer ID
+ pub id: NodeID,
+ /// Private key associated with our peer ID
pub privkey: ed25519::SecretKey,
- server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>,
- client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>,
+ server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>,
+ client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,
pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>,
- on_connected_handler:
- ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, SocketAddr, bool) + Send + Sync>>,
- on_disconnected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
+ on_connected_handler: ArcSwapOption<Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>>,
+ on_disconnected_handler: ArcSwapOption<Box<dyn Fn(NodeID, bool) + Send + Sync>>,
}
-async fn net_handler_aux<M, F, R>(
- handler: Arc<F>,
- remote: ed25519::PublicKey,
- bytes: Bytes,
-) -> Vec<u8>
+struct ListenParams {
+ listen_addr: SocketAddr,
+ public_addr: Option<IpAddr>,
+}
+
+async fn net_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, bytes: Bytes) -> Vec<u8>
where
M: Message + 'static,
- F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
+ F: Fn(NodeID, M) -> R + Send + Sync + 'static,
R: Future<Output = <M as Message>::Response> + Send + Sync,
{
debug!(
@@ -93,14 +93,10 @@ where
rmp_to_vec_all_named(&res).unwrap_or(vec![])
}
-async fn local_handler_aux<M, F, R>(
- handler: Arc<F>,
- remote: ed25519::PublicKey,
- msg: DynMsg,
-) -> DynMsg
+async fn local_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, msg: DynMsg) -> DynMsg
where
M: Message + 'static,
- F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
+ F: Fn(NodeID, M) -> R + Send + Sync + 'static,
R: Future<Output = <M as Message>::Response> + Send + Sync,
{
debug!("Handling message of kind {:08x} from ourself", M::KIND);
@@ -110,19 +106,17 @@ where
}
impl NetApp {
- /// Creates a new instance of NetApp. No background process is
- pub fn new(
- listen_addr: SocketAddr,
- public_addr: Option<IpAddr>,
- netid: auth::Key,
- privkey: ed25519::SecretKey,
- ) -> Arc<Self> {
- let pubkey = privkey.public_key();
+ /// Creates a new instance of NetApp, which can serve either as a full p2p node,
+ /// or just as a passive client. To upgrade to a full p2p node, spawn a listener
+ /// using `.listen()`
+ ///
+ /// Our Peer ID is the public key associated to the secret key given here.
+ pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> {
+ let id = privkey.public_key();
let netapp = Arc::new(Self {
- listen_addr,
- public_addr,
+ listen_params: ArcSwapOption::new(None),
netid,
- pubkey,
+ id,
privkey,
server_conns: RwLock::new(HashMap::new()),
client_conns: RwLock::new(HashMap::new()),
@@ -132,12 +126,10 @@ impl NetApp {
});
let netapp2 = netapp.clone();
- netapp.add_msg_handler::<HelloMessage, _, _>(
- move |from: ed25519::PublicKey, msg: HelloMessage| {
- netapp2.handle_hello_message(from, msg);
- async { () }
- },
- );
+ netapp.add_msg_handler::<HelloMessage, _, _>(move |from: NodeID, msg: HelloMessage| {
+ netapp2.handle_hello_message(from, msg);
+ async { () }
+ });
netapp
}
@@ -147,7 +139,7 @@ impl NetApp {
/// as the peering strategy will need to set this itself.
pub fn on_connected<F>(&self, handler: F)
where
- F: Fn(ed25519::PublicKey, SocketAddr, bool) + Sized + Send + Sync + 'static,
+ F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static,
{
self.on_connected_handler
.store(Some(Arc::new(Box::new(handler))));
@@ -158,7 +150,7 @@ impl NetApp {
/// as the peering strategy will need to set this itself.
pub fn on_disconnected<F>(&self, handler: F)
where
- F: Fn(ed25519::PublicKey, bool) + Sized + Send + Sync + 'static,
+ F: Fn(NodeID, bool) + Sized + Send + Sync + 'static,
{
self.on_disconnected_handler
.store(Some(Arc::new(Box::new(handler))));
@@ -169,19 +161,19 @@ impl NetApp {
pub fn add_msg_handler<M, F, R>(&self, handler: F)
where
M: Message + 'static,
- F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
+ F: Fn(NodeID, M) -> R + Send + Sync + 'static,
R: Future<Output = <M as Message>::Response> + Send + Sync + 'static,
{
let handler = Arc::new(handler);
let handler2 = handler.clone();
- let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| {
+ let net_handler = Box::new(move |remote: NodeID, bytes: Bytes| {
let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> =
Box::pin(net_handler_aux(handler2.clone(), remote, bytes));
fun
});
- let self_id = self.pubkey.clone();
+ let self_id = self.id.clone();
let local_handler = Box::new(move |msg: DynMsg| {
let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> =
Box::pin(local_handler_aux(handler.clone(), self_id, msg));
@@ -200,9 +192,16 @@ impl NetApp {
/// Main listening process for our app. This future runs during the whole
/// run time of our application.
- pub async fn listen(self: Arc<Self>) {
- let mut listener = TcpListener::bind(self.listen_addr).await.unwrap();
- info!("Listening on {}", self.listen_addr);
+ /// If this is not called, the NetApp instance remains a passive client.
+ pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) {
+ let listen_params = ListenParams {
+ listen_addr,
+ public_addr,
+ };
+ self.listen_params.store(Some(Arc::new(listen_params)));
+
+ let mut listener = TcpListener::bind(listen_addr).await.unwrap();
+ info!("Listening on {}", listen_addr);
loop {
// The second item contains the IP and port of the new connection.
@@ -225,43 +224,39 @@ impl NetApp {
/// This function returns once the connection has been established and a
/// successfull handshake was made. At this point we can send messages to
/// the other node with `Netapp::request`
- pub async fn try_connect(
- self: Arc<Self>,
- ip: SocketAddr,
- pk: ed25519::PublicKey,
- ) -> Result<(), Error> {
+ pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
// Don't connect to ourself, we don't care
// but pretend we did
- if pk == self.pubkey {
+ if id == self.id {
tokio::spawn(async move {
if let Some(h) = self.on_connected_handler.load().as_ref() {
- h(pk, ip, false);
+ h(id, ip, false);
}
});
return Ok(());
}
// Don't connect if already connected
- if self.client_conns.read().unwrap().contains_key(&pk) {
+ if self.client_conns.read().unwrap().contains_key(&id) {
return Ok(());
}
let socket = TcpStream::connect(ip).await?;
info!("Connected to {}, negotiating handshake...", ip);
- ClientConn::init(self, socket, pk.clone()).await?;
+ ClientConn::init(self, socket, id.clone()).await?;
Ok(())
}
/// Close the outgoing connection we have to a node specified by its public key,
/// if such a connection is currently open.
- pub fn disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) {
- // If pk is ourself, we're not supposed to have a connection open
- if *pk != self.pubkey {
- let conn = self.client_conns.write().unwrap().remove(pk);
+ pub fn disconnect(self: &Arc<Self>, id: &NodeID) {
+ // If id is ourself, we're not supposed to have a connection open
+ if *id != self.id {
+ let conn = self.client_conns.write().unwrap().remove(id);
if let Some(c) = conn {
debug!(
"Closing connection to {} ({})",
- hex::encode(c.peer_pk),
+ hex::encode(c.peer_id),
c.remote_addr
);
c.close();
@@ -272,24 +267,24 @@ impl NetApp {
// call on_disconnected_handler immediately, since the connection
// was removed
- // (if pk == self.pubkey, we pretend we disconnected)
- let pk = *pk;
+ // (if id == self.id, we pretend we disconnected)
+ let id = *id;
let self2 = self.clone();
tokio::spawn(async move {
if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
- h(pk, false);
+ h(id, false);
}
});
}
/// Close the incoming connection from a certain client to us,
/// if such a connection is currently open.
- pub fn server_disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) {
- let conn = self.server_conns.read().unwrap().get(pk).cloned();
+ pub fn server_disconnect(self: &Arc<Self>, id: &NodeID) {
+ let conn = self.server_conns.read().unwrap().get(id).cloned();
if let Some(c) = conn {
debug!(
"Closing incoming connection from {} ({})",
- hex::encode(c.peer_pk),
+ hex::encode(c.peer_id),
c.remote_addr
);
c.close();
@@ -301,7 +296,7 @@ impl NetApp {
// Do not yet call the on_connected handler, because we don't know if the remote
// has an actual IP address and port we can call them back on.
// We will know this when they send a Hello message, which is handled below.
- pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) {
+ pub(crate) fn connected_as_server(&self, id: NodeID, conn: Arc<ServerConn>) {
info!("Accepted connection from {}", hex::encode(id));
self.server_conns.write().unwrap().insert(id, conn);
@@ -312,11 +307,10 @@ impl NetApp {
// At this point we know they are a full network member, and not just a client,
// and we call the on_connected handler so that the peering strategy knows
// we have a new potential peer
- fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) {
+ fn handle_hello_message(&self, id: NodeID, msg: HelloMessage) {
if let Some(h) = self.on_connected_handler.load().as_ref() {
if let Some(c) = self.server_conns.read().unwrap().get(&id) {
- let remote_ip = msg.server_addr
- .unwrap_or(c.remote_addr.ip());
+ let remote_ip = msg.server_addr.unwrap_or(c.remote_addr.ip());
let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
h(id, remote_addr, true);
}
@@ -326,7 +320,7 @@ impl NetApp {
// Called from conn.rs when an incoming connection is closed.
// We deregister the connection from server_conns and call the
// handler registered by on_disconnected
- pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc<ServerConn>) {
+ pub(crate) fn disconnected_as_server(&self, id: &NodeID, conn: Arc<ServerConn>) {
info!("Connection from {} closed", hex::encode(id));
let mut conn_list = self.server_conns.write().unwrap();
@@ -336,7 +330,7 @@ impl NetApp {
drop(conn_list);
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
- h(conn.peer_pk, true);
+ h(conn.peer_id, true);
}
}
}
@@ -349,7 +343,7 @@ impl NetApp {
// Since we are ourself listening, we send them a Hello message so that
// they know on which port to call us back. (TODO: don't do this if we are
// just a simple client and not a full p2p node)
- pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc<ClientConn>) {
+ pub(crate) fn connected_as_client(&self, id: NodeID, conn: Arc<ClientConn>) {
info!("Connection established to {}", hex::encode(id));
{
@@ -360,22 +354,30 @@ impl NetApp {
}
if let Some(h) = self.on_connected_handler.load().as_ref() {
- h(conn.peer_pk, conn.remote_addr, false);
+ h(conn.peer_id, conn.remote_addr, false);
}
- let server_addr = self.public_addr;
- let server_port = self.listen_addr.port();
- tokio::spawn(async move {
- conn.request(HelloMessage { server_addr, server_port }, PRIO_NORMAL)
+ if let Some(lp) = self.listen_params.load_full() {
+ let server_addr = lp.public_addr;
+ let server_port = lp.listen_addr.port();
+ tokio::spawn(async move {
+ conn.request(
+ HelloMessage {
+ server_addr,
+ server_port,
+ },
+ PRIO_NORMAL,
+ )
.await
.log_err("Sending hello message");
- });
+ });
+ }
}
// Called from conn.rs when an outgoinc connection is closed.
// The connection is removed from conn_list, and the on_disconnected handler
// is called.
- pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc<ClientConn>) {
+ pub(crate) fn disconnected_as_client(&self, id: &NodeID, conn: Arc<ClientConn>) {
info!("Connection to {} closed", hex::encode(id));
let mut conn_list = self.client_conns.write().unwrap();
if let Some(c) = conn_list.get(id) {
@@ -384,7 +386,7 @@ impl NetApp {
drop(conn_list);
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
- h(conn.peer_pk, false);
+ h(conn.peer_id, false);
}
}
}
@@ -398,14 +400,14 @@ impl NetApp {
/// The priority is an `u8`, with lower numbers meaning highest priority.
pub async fn request<T>(
&self,
- target: &ed25519::PublicKey,
+ target: &NodeID,
rq: T,
prio: RequestPriority,
) -> Result<<T as Message>::Response, Error>
where
T: Message + 'static,
{
- if *target == self.pubkey {
+ if *target == self.id {
let handler = self.msg_handlers.load().get(&T::KIND).cloned();
match handler {
None => Err(Error::Message(format!(