diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/basalt.rs | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/examples/basalt.rs b/examples/basalt.rs new file mode 100644 index 0000000..4ea4f71 --- /dev/null +++ b/examples/basalt.rs @@ -0,0 +1,164 @@ +use std::io::Write; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use log::{debug, info, warn}; + +use serde::{Deserialize, Serialize}; +use structopt::StructOpt; + +use sodiumoxide::crypto::auth; +use sodiumoxide::crypto::sign::ed25519; + +use netapp::message::*; +use netapp::peering::basalt::*; +use netapp::proto::*; +use netapp::NetApp; + +#[derive(StructOpt, Debug)] +#[structopt(name = "netapp")] +pub struct Opt { + #[structopt(long = "network-key", short = "n")] + network_key: Option<String>, + + #[structopt(long = "private-key", short = "p")] + private_key: Option<String>, + + #[structopt(long = "bootstrap-peer", short = "b")] + bootstrap_peers: Vec<String>, + + #[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")] + listen_addr: String, + + #[structopt(long = "public-addr", short = "a")] + public_addr: Option<String>, + + #[structopt(long = "view-size", short = "v", default_value = "100")] + view_size: usize, + + #[structopt(long = "cache-size", short = "c", default_value = "1000")] + cache_size: usize, + + #[structopt(long = "exchange-interval-secs", short = "x", default_value = "1")] + exchange_interval: u64, + + #[structopt(long = "reset-interval-secs", short = "r", default_value = "10")] + reset_interval: u64, + + #[structopt(long = "reset-count", short = "k", default_value = "20")] + reset_count: usize, +} + +#[tokio::main] +async fn main() { + env_logger::Builder::new() + .parse_env("RUST_LOG") + .format(|buf, record| { + writeln!( + buf, + "{} {} {} {}", + chrono::Local::now().format("%s%.6f"), + record.module_path().unwrap_or("_"), + record.level(), + record.args() + ) + }) + .init(); + + let opt = Opt::from_args(); + + let netid = match &opt.network_key { + Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(), + None => auth::gen_key(), + }; + info!("KYEV NK {}", hex::encode(&netid)); + + let privkey = match &opt.private_key { + Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(), + None => { + let (_pk, sk) = ed25519::gen_keypair(); + sk + } + }; + + info!("KYEV SK {}", hex::encode(&privkey)); + info!("KYEV PK {}", hex::encode(&privkey.public_key())); + + let netapp = NetApp::new(netid, privkey); + + let mut bootstrap_peers = vec![]; + for peer in opt.bootstrap_peers.iter() { + if let Some(delim) = peer.find('@') { + let (key, ip) = peer.split_at(delim); + let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap(); + let ip = ip[1..].parse::<SocketAddr>().unwrap(); + bootstrap_peers.push((pubkey, ip)); + } + } + + let basalt_params = BasaltParams { + view_size: opt.view_size, + cache_size: opt.cache_size, + exchange_interval: Duration::from_secs(opt.exchange_interval), + reset_interval: Duration::from_secs(opt.reset_interval), + reset_count: opt.reset_count, + }; + let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); + + netapp.add_msg_handler::<ExampleMessage, _, _>( + |_from: ed25519::PublicKey, msg: ExampleMessage| { + debug!("Got example message: {:?}, sending example response", msg); + async { + ExampleResponse { + example_field: false, + } + } + }, + ); + + let listen_addr = opt.listen_addr.parse().unwrap(); + let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); + tokio::join!( + sampling_loop(netapp.clone(), peering.clone()), + netapp.listen(listen_addr, public_addr), + peering.run(), + ); +} + +async fn sampling_loop(netapp: Arc<NetApp>, basalt: Arc<Basalt>) { + loop { + tokio::time::delay_for(Duration::from_secs(10)).await; + + let peers = basalt.sample(10); + for p in peers { + debug!("kyev S {}", hex::encode(p)); + + let netapp2 = netapp.clone(); + tokio::spawn(async move { + match netapp2 + .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) + .await + { + Ok(resp) => debug!("Got example response: {:?}", resp), + Err(e) => warn!("Error with example request: {}", e), + } + }); + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct ExampleMessage { + example_field: usize, +} + +#[derive(Serialize, Deserialize, Debug)] +struct ExampleResponse { + example_field: bool, +} + +impl Message for ExampleMessage { + const KIND: MessageKind = 0x99000001; + type Response = ExampleResponse; +} |