aboutsummaryrefslogtreecommitdiff
path: root/examples/basalt.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-02-18 12:07:04 +0100
committerAlex Auvolat <alex@adnab.me>2021-02-18 12:07:04 +0100
commit4e85862a855d6a9f13ccbcccfbf512ba6db0364d (patch)
treeeca18b0de30bcefa6b00ca3fce619aa8798a9a8e /examples/basalt.rs
parent4d01a9dd0edd760b54cff5d0998325c6cfa2ba72 (diff)
downloadnetapp-4e85862a855d6a9f13ccbcccfbf512ba6db0364d.tar.gz
netapp-4e85862a855d6a9f13ccbcccfbf512ba6db0364d.zip
Publish Bᴀsᴀʟᴛ
Diffstat (limited to 'examples/basalt.rs')
-rw-r--r--examples/basalt.rs164
1 files changed, 164 insertions, 0 deletions
diff --git a/examples/basalt.rs b/examples/basalt.rs
new file mode 100644
index 0000000..4ea4f71
--- /dev/null
+++ b/examples/basalt.rs
@@ -0,0 +1,164 @@
+use std::io::Write;
+use std::net::SocketAddr;
+use std::sync::Arc;
+use std::time::Duration;
+
+use log::{debug, info, warn};
+
+use serde::{Deserialize, Serialize};
+use structopt::StructOpt;
+
+use sodiumoxide::crypto::auth;
+use sodiumoxide::crypto::sign::ed25519;
+
+use netapp::message::*;
+use netapp::peering::basalt::*;
+use netapp::proto::*;
+use netapp::NetApp;
+
+#[derive(StructOpt, Debug)]
+#[structopt(name = "netapp")]
+pub struct Opt {
+ #[structopt(long = "network-key", short = "n")]
+ network_key: Option<String>,
+
+ #[structopt(long = "private-key", short = "p")]
+ private_key: Option<String>,
+
+ #[structopt(long = "bootstrap-peer", short = "b")]
+ bootstrap_peers: Vec<String>,
+
+ #[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")]
+ listen_addr: String,
+
+ #[structopt(long = "public-addr", short = "a")]
+ public_addr: Option<String>,
+
+ #[structopt(long = "view-size", short = "v", default_value = "100")]
+ view_size: usize,
+
+ #[structopt(long = "cache-size", short = "c", default_value = "1000")]
+ cache_size: usize,
+
+ #[structopt(long = "exchange-interval-secs", short = "x", default_value = "1")]
+ exchange_interval: u64,
+
+ #[structopt(long = "reset-interval-secs", short = "r", default_value = "10")]
+ reset_interval: u64,
+
+ #[structopt(long = "reset-count", short = "k", default_value = "20")]
+ reset_count: usize,
+}
+
+#[tokio::main]
+async fn main() {
+ env_logger::Builder::new()
+ .parse_env("RUST_LOG")
+ .format(|buf, record| {
+ writeln!(
+ buf,
+ "{} {} {} {}",
+ chrono::Local::now().format("%s%.6f"),
+ record.module_path().unwrap_or("_"),
+ record.level(),
+ record.args()
+ )
+ })
+ .init();
+
+ let opt = Opt::from_args();
+
+ let netid = match &opt.network_key {
+ Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(),
+ None => auth::gen_key(),
+ };
+ info!("KYEV NK {}", hex::encode(&netid));
+
+ let privkey = match &opt.private_key {
+ Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(),
+ None => {
+ let (_pk, sk) = ed25519::gen_keypair();
+ sk
+ }
+ };
+
+ info!("KYEV SK {}", hex::encode(&privkey));
+ info!("KYEV PK {}", hex::encode(&privkey.public_key()));
+
+ let netapp = NetApp::new(netid, privkey);
+
+ let mut bootstrap_peers = vec![];
+ for peer in opt.bootstrap_peers.iter() {
+ if let Some(delim) = peer.find('@') {
+ let (key, ip) = peer.split_at(delim);
+ let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap();
+ let ip = ip[1..].parse::<SocketAddr>().unwrap();
+ bootstrap_peers.push((pubkey, ip));
+ }
+ }
+
+ let basalt_params = BasaltParams {
+ view_size: opt.view_size,
+ cache_size: opt.cache_size,
+ exchange_interval: Duration::from_secs(opt.exchange_interval),
+ reset_interval: Duration::from_secs(opt.reset_interval),
+ reset_count: opt.reset_count,
+ };
+ let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
+
+ netapp.add_msg_handler::<ExampleMessage, _, _>(
+ |_from: ed25519::PublicKey, msg: ExampleMessage| {
+ debug!("Got example message: {:?}, sending example response", msg);
+ async {
+ ExampleResponse {
+ example_field: false,
+ }
+ }
+ },
+ );
+
+ let listen_addr = opt.listen_addr.parse().unwrap();
+ let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
+ tokio::join!(
+ sampling_loop(netapp.clone(), peering.clone()),
+ netapp.listen(listen_addr, public_addr),
+ peering.run(),
+ );
+}
+
+async fn sampling_loop(netapp: Arc<NetApp>, basalt: Arc<Basalt>) {
+ loop {
+ tokio::time::delay_for(Duration::from_secs(10)).await;
+
+ let peers = basalt.sample(10);
+ for p in peers {
+ debug!("kyev S {}", hex::encode(p));
+
+ let netapp2 = netapp.clone();
+ tokio::spawn(async move {
+ match netapp2
+ .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
+ .await
+ {
+ Ok(resp) => debug!("Got example response: {:?}", resp),
+ Err(e) => warn!("Error with example request: {}", e),
+ }
+ });
+ }
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct ExampleMessage {
+ example_field: usize,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct ExampleResponse {
+ example_field: bool,
+}
+
+impl Message for ExampleMessage {
+ const KIND: MessageKind = 0x99000001;
+ type Response = ExampleResponse;
+}