aboutsummaryrefslogtreecommitdiff
path: root/src/net/test.rs
blob: 19759ccf793553b43192e1d8930390c05d1a8106 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use tokio::select;
use tokio::sync::watch;

use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::ed25519;

use crate::netapp::*;
use crate::peering::fullmesh::*;
use crate::NodeID;

#[tokio::test(flavor = "current_thread")]
async fn test_with_basic_scheduler() {
	env_logger::init();
	run_test().await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_with_threaded_scheduler() {
	run_test().await
}

async fn run_test() {
	select! {
		_ = run_test_inner() => (),
		_ = tokio::time::sleep(Duration::from_secs(20)) => panic!("timeout"),
	}
}

async fn run_test_inner() {
	let netid = auth::gen_key();

	let (pk1, sk1) = ed25519::gen_keypair();
	let (pk2, sk2) = ed25519::gen_keypair();
	let (pk3, sk3) = ed25519::gen_keypair();

	let addr1: SocketAddr = "127.0.0.1:19991".parse().unwrap();
	let addr2: SocketAddr = "127.0.0.1:19992".parse().unwrap();
	let addr3: SocketAddr = "127.0.0.1:19993".parse().unwrap();

	let (stop_tx, stop_rx) = watch::channel(false);

	let (thread1, _netapp1, peering1) =
		run_netapp(netid.clone(), pk1, sk1, addr1, vec![], stop_rx.clone());
	tokio::time::sleep(Duration::from_secs(2)).await;

	// Connect second node and check it peers with everyone
	let (thread2, _netapp2, peering2) = run_netapp(
		netid.clone(),
		pk2,
		sk2,
		addr2,
		vec![(pk1, addr1)],
		stop_rx.clone(),
	);
	tokio::time::sleep(Duration::from_secs(5)).await;

	let pl1 = peering1.get_peer_list();
	println!("A pl1: {:?}", pl1);
	assert_eq!(pl1.len(), 2);

	let pl2 = peering2.get_peer_list();
	println!("A pl2: {:?}", pl2);
	assert_eq!(pl2.len(), 2);

	// Connect third ndoe and check it peers with everyone
	let (thread3, _netapp3, peering3) =
		run_netapp(netid, pk3, sk3, addr3, vec![(pk2, addr2)], stop_rx.clone());
	tokio::time::sleep(Duration::from_secs(5)).await;

	let pl1 = peering1.get_peer_list();
	println!("B pl1: {:?}", pl1);
	assert_eq!(pl1.len(), 3);

	let pl2 = peering2.get_peer_list();
	println!("B pl2: {:?}", pl2);
	assert_eq!(pl2.len(), 3);

	let pl3 = peering3.get_peer_list();
	println!("B pl3: {:?}", pl3);
	assert_eq!(pl3.len(), 3);

	// Send stop signal and wait for everyone to finish
	stop_tx.send(true).unwrap();
	thread1.await.unwrap();
	thread2.await.unwrap();
	thread3.await.unwrap();
}

fn run_netapp(
	netid: auth::Key,
	_pk: NodeID,
	sk: ed25519::SecretKey,
	listen_addr: SocketAddr,
	bootstrap_peers: Vec<(NodeID, SocketAddr)>,
	must_exit: watch::Receiver<bool>,
) -> (
	tokio::task::JoinHandle<()>,
	Arc<NetApp>,
	Arc<FullMeshPeeringStrategy>,
) {
	let netapp = NetApp::new(0u64, netid, sk);
	let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers, None);

	let peering2 = peering.clone();
	let netapp2 = netapp.clone();
	let fut = tokio::spawn(async move {
		tokio::join!(
			netapp2.listen(listen_addr, None, must_exit.clone()),
			peering2.run(must_exit.clone()),
		);
	});

	(fut, netapp, peering)
}