aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.drone.yml2
-rw-r--r--README.md2
-rw-r--r--examples/basalt.rs164
-rw-r--r--src/peering/basalt.rs476
-rw-r--r--src/peering/mod.rs1
5 files changed, 644 insertions, 1 deletions
diff --git a/.drone.yml b/.drone.yml
index cf4cc3e..aa99d4c 100644
--- a/.drone.yml
+++ b/.drone.yml
@@ -27,6 +27,8 @@ steps:
- apt-get update
- apt-get install --yes libsodium-dev
- cargo build
+ - cargo build --example fullmesh
+ - cargo build --example basalt
- name: rebuild-cache
image: meltwater/drone-cache:dev
diff --git a/README.md b/README.md
index d39de5f..9e56c9f 100644
--- a/README.md
+++ b/README.md
@@ -8,6 +8,6 @@ Netapp is a Rust library that takes care of a few common tasks in distributed so
- peer discovery
- query/response message passing model for communications
- multiplexing transfers over a connection
-- overlay networks: full mesh, and soon other methods
+- overlay networks: full mesh, and byzantine-tolerant random peer sampling using [Bᴀsᴀʟᴛ](https://arxiv.org/abs/2102.04063).
See examples folder to learn how to use netapp.
diff --git a/examples/basalt.rs b/examples/basalt.rs
new file mode 100644
index 0000000..4ea4f71
--- /dev/null
+++ b/examples/basalt.rs
@@ -0,0 +1,164 @@
+use std::io::Write;
+use std::net::SocketAddr;
+use std::sync::Arc;
+use std::time::Duration;
+
+use log::{debug, info, warn};
+
+use serde::{Deserialize, Serialize};
+use structopt::StructOpt;
+
+use sodiumoxide::crypto::auth;
+use sodiumoxide::crypto::sign::ed25519;
+
+use netapp::message::*;
+use netapp::peering::basalt::*;
+use netapp::proto::*;
+use netapp::NetApp;
+
+#[derive(StructOpt, Debug)]
+#[structopt(name = "netapp")]
+pub struct Opt {
+ #[structopt(long = "network-key", short = "n")]
+ network_key: Option<String>,
+
+ #[structopt(long = "private-key", short = "p")]
+ private_key: Option<String>,
+
+ #[structopt(long = "bootstrap-peer", short = "b")]
+ bootstrap_peers: Vec<String>,
+
+ #[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")]
+ listen_addr: String,
+
+ #[structopt(long = "public-addr", short = "a")]
+ public_addr: Option<String>,
+
+ #[structopt(long = "view-size", short = "v", default_value = "100")]
+ view_size: usize,
+
+ #[structopt(long = "cache-size", short = "c", default_value = "1000")]
+ cache_size: usize,
+
+ #[structopt(long = "exchange-interval-secs", short = "x", default_value = "1")]
+ exchange_interval: u64,
+
+ #[structopt(long = "reset-interval-secs", short = "r", default_value = "10")]
+ reset_interval: u64,
+
+ #[structopt(long = "reset-count", short = "k", default_value = "20")]
+ reset_count: usize,
+}
+
+#[tokio::main]
+async fn main() {
+ env_logger::Builder::new()
+ .parse_env("RUST_LOG")
+ .format(|buf, record| {
+ writeln!(
+ buf,
+ "{} {} {} {}",
+ chrono::Local::now().format("%s%.6f"),
+ record.module_path().unwrap_or("_"),
+ record.level(),
+ record.args()
+ )
+ })
+ .init();
+
+ let opt = Opt::from_args();
+
+ let netid = match &opt.network_key {
+ Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(),
+ None => auth::gen_key(),
+ };
+ info!("KYEV NK {}", hex::encode(&netid));
+
+ let privkey = match &opt.private_key {
+ Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(),
+ None => {
+ let (_pk, sk) = ed25519::gen_keypair();
+ sk
+ }
+ };
+
+ info!("KYEV SK {}", hex::encode(&privkey));
+ info!("KYEV PK {}", hex::encode(&privkey.public_key()));
+
+ let netapp = NetApp::new(netid, privkey);
+
+ let mut bootstrap_peers = vec![];
+ for peer in opt.bootstrap_peers.iter() {
+ if let Some(delim) = peer.find('@') {
+ let (key, ip) = peer.split_at(delim);
+ let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap();
+ let ip = ip[1..].parse::<SocketAddr>().unwrap();
+ bootstrap_peers.push((pubkey, ip));
+ }
+ }
+
+ let basalt_params = BasaltParams {
+ view_size: opt.view_size,
+ cache_size: opt.cache_size,
+ exchange_interval: Duration::from_secs(opt.exchange_interval),
+ reset_interval: Duration::from_secs(opt.reset_interval),
+ reset_count: opt.reset_count,
+ };
+ let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
+
+ netapp.add_msg_handler::<ExampleMessage, _, _>(
+ |_from: ed25519::PublicKey, msg: ExampleMessage| {
+ debug!("Got example message: {:?}, sending example response", msg);
+ async {
+ ExampleResponse {
+ example_field: false,
+ }
+ }
+ },
+ );
+
+ let listen_addr = opt.listen_addr.parse().unwrap();
+ let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
+ tokio::join!(
+ sampling_loop(netapp.clone(), peering.clone()),
+ netapp.listen(listen_addr, public_addr),
+ peering.run(),
+ );
+}
+
+async fn sampling_loop(netapp: Arc<NetApp>, basalt: Arc<Basalt>) {
+ loop {
+ tokio::time::delay_for(Duration::from_secs(10)).await;
+
+ let peers = basalt.sample(10);
+ for p in peers {
+ debug!("kyev S {}", hex::encode(p));
+
+ let netapp2 = netapp.clone();
+ tokio::spawn(async move {
+ match netapp2
+ .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
+ .await
+ {
+ Ok(resp) => debug!("Got example response: {:?}", resp),
+ Err(e) => warn!("Error with example request: {}", e),
+ }
+ });
+ }
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct ExampleMessage {
+ example_field: usize,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct ExampleResponse {
+ example_field: bool,
+}
+
+impl Message for ExampleMessage {
+ const KIND: MessageKind = 0x99000001;
+ type Response = ExampleResponse;
+}
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
new file mode 100644
index 0000000..3c1fc9e
--- /dev/null
+++ b/src/peering/basalt.rs
@@ -0,0 +1,476 @@
+use std::collections::HashSet;
+use std::net::SocketAddr;
+use std::sync::{Arc, RwLock};
+use std::time::Duration;
+
+use log::{debug, info, trace, warn};
+use lru::LruCache;
+use rand::{thread_rng, Rng};
+use serde::{Deserialize, Serialize};
+
+use sodiumoxide::crypto::hash;
+
+use crate::message::*;
+use crate::netapp::*;
+use crate::proto::*;
+use crate::NodeID;
+
+// -- Protocol messages --
+
+#[derive(Serialize, Deserialize)]
+struct PullMessage {}
+
+impl Message for PullMessage {
+ const KIND: MessageKind = 0x42001100;
+ type Response = PushMessage;
+}
+
+#[derive(Serialize, Deserialize)]
+struct PushMessage {
+ peers: Vec<Peer>,
+}
+
+impl Message for PushMessage {
+ const KIND: MessageKind = 0x42001101;
+ type Response = ();
+}
+
+// -- Algorithm data structures --
+
+type Seed = [u8; 32];
+
+#[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
+struct Peer {
+ id: NodeID,
+ addr: SocketAddr,
+}
+
+type Cost = [u8; 40];
+const MAX_COST: Cost = [0xffu8; 40];
+
+impl Peer {
+ fn cost(&self, seed: &Seed) -> Cost {
+ let mut hasher = hash::State::new();
+ hasher.update(&seed[..]);
+
+ let mut cost = [0u8; 40];
+ match self.addr {
+ SocketAddr::V4(v4addr) => {
+ let v4ip = v4addr.ip().octets();
+
+ for i in 0..4 {
+ let mut h = hasher.clone();
+ h.update(&v4ip[..i + 1]);
+ cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]);
+ }
+ }
+ SocketAddr::V6(v6addr) => {
+ let v6ip = v6addr.ip().octets();
+
+ for i in 0..4 {
+ let mut h = hasher.clone();
+ h.update(&v6ip[..i + 2]);
+ cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]);
+ }
+ }
+ }
+
+ {
+ let mut h5 = hasher.clone();
+ h5.update(&format!("{} {}", self.addr, hex::encode(self.id)).into_bytes()[..]);
+ cost[32..40].copy_from_slice(&h5.finalize()[..8]);
+ }
+
+ cost
+ }
+}
+
+struct BasaltSlot {
+ seed: Seed,
+ peer: Option<Peer>,
+}
+
+impl BasaltSlot {
+ fn cost(&self) -> Cost {
+ self.peer.map(|p| p.cost(&self.seed)).unwrap_or(MAX_COST)
+ }
+}
+
+struct BasaltView {
+ i_reset: usize,
+ slots: Vec<BasaltSlot>,
+}
+
+impl BasaltView {
+ fn new(size: usize) -> Self {
+ let slots = (0..size)
+ .map(|_| BasaltSlot {
+ seed: rand_seed(),
+ peer: None,
+ })
+ .collect::<Vec<_>>();
+ Self { i_reset: 0, slots }
+ }
+
+ fn current_peers(&self) -> HashSet<Peer> {
+ self.slots
+ .iter()
+ .filter(|s| s.peer.is_some())
+ .map(|s| s.peer.unwrap().clone())
+ .collect::<HashSet<_>>()
+ }
+ fn current_peers_vec(&self) -> Vec<Peer> {
+ self.current_peers().drain().collect::<Vec<_>>()
+ }
+
+ fn sample(&self, count: usize) -> Vec<Peer> {
+ let possibles = self
+ .slots
+ .iter()
+ .enumerate()
+ .filter(|(_i, s)| s.peer.is_some())
+ .map(|(i, _s)| i)
+ .collect::<Vec<_>>();
+ if possibles.len() == 0 {
+ vec![]
+ } else {
+ let mut ret = vec![];
+ let mut rng = thread_rng();
+ for _i in 0..count {
+ let idx = rng.gen_range(0, possibles.len());
+ ret.push(self.slots[possibles[idx]].peer.unwrap());
+ }
+ ret
+ }
+ }
+
+ fn update_slot(&mut self, i: usize, peers: &[Peer]) {
+ let mut slot_cost = self.slots[i].cost();
+
+ for peer in peers.iter() {
+ let peer_cost = peer.cost(&self.slots[i].seed);
+ if self.slots[i].peer.is_none() || peer_cost < slot_cost {
+ trace!(
+ "Best match for slot {}: {}@{} (cost {})",
+ i,
+ hex::encode(peer.id),
+ peer.addr,
+ hex::encode(peer_cost)
+ );
+ self.slots[i].peer = Some(*peer);
+ slot_cost = peer_cost;
+ }
+ }
+ }
+ fn update_all_slots(&mut self, peers: &[Peer]) {
+ for i in 0..self.slots.len() {
+ self.update_slot(i, peers);
+ }
+ }
+
+ fn disconnected(&mut self, id: NodeID) {
+ let mut cleared_slots = vec![];
+ for i in 0..self.slots.len() {
+ if let Some(p) = self.slots[i].peer {
+ if p.id == id {
+ self.slots[i].peer = None;
+ cleared_slots.push(i);
+ }
+ }
+ }
+
+ let remaining_peers = self.current_peers_vec();
+
+ for i in cleared_slots {
+ self.update_slot(i, &remaining_peers[..]);
+ }
+ }
+
+ fn should_try_list(&self, peers: &[Peer]) -> Vec<Peer> {
+ // Select peers that have lower cost than any of our slots
+ let mut ret = HashSet::new();
+
+ for i in 0..self.slots.len() {
+ if self.slots[i].peer.is_none() {
+ return peers.to_vec();
+ }
+ let mut min_cost = self.slots[i].cost();
+ let mut min_peer = None;
+ for peer in peers.iter() {
+ if ret.contains(peer) {
+ continue;
+ }
+ let peer_cost = peer.cost(&self.slots[i].seed);
+ if peer_cost < min_cost {
+ min_cost = peer_cost;
+ min_peer = Some(*peer);
+ }
+ }
+ if let Some(p) = min_peer {
+ ret.insert(p);
+ if ret.len() == peers.len() {
+ break;
+ }
+ }
+ }
+
+ ret.drain().collect::<Vec<_>>()
+ }
+
+ fn reset_some_slots(&mut self, count: usize) {
+ for _i in 0..count {
+ trace!("Reset slot {}", self.i_reset);
+ self.slots[self.i_reset].seed = rand_seed();
+ self.i_reset = (self.i_reset + 1) % self.slots.len();
+ }
+ }
+}
+
+pub struct BasaltParams {
+ pub view_size: usize,
+ pub cache_size: usize,
+ pub exchange_interval: Duration,
+ pub reset_interval: Duration,
+ pub reset_count: usize,
+}
+
+pub struct Basalt {
+ netapp: Arc<NetApp>,
+
+ param: BasaltParams,
+ bootstrap_peers: Vec<Peer>,
+
+ view: RwLock<BasaltView>,
+ current_attempts: RwLock<HashSet<Peer>>,
+ backlog: RwLock<LruCache<Peer, ()>>,
+}
+
+impl Basalt {
+ pub fn new(
+ netapp: Arc<NetApp>,
+ bootstrap_list: Vec<(NodeID, SocketAddr)>,
+ param: BasaltParams,
+ ) -> Arc<Self> {
+ let bootstrap_peers = bootstrap_list
+ .iter()
+ .map(|(id, addr)| Peer {
+ id: *id,
+ addr: *addr,
+ })
+ .collect::<Vec<_>>();
+
+ let view = BasaltView::new(param.view_size);
+ let backlog = LruCache::new(param.cache_size);
+
+ let basalt = Arc::new(Self {
+ netapp: netapp.clone(),
+ param,
+ bootstrap_peers,
+ view: RwLock::new(view),
+ current_attempts: RwLock::new(HashSet::new()),
+ backlog: RwLock::new(backlog),
+ });
+
+ let basalt2 = basalt.clone();
+ netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
+ basalt2.on_connected(id, addr, is_incoming);
+ });
+
+ let basalt2 = basalt.clone();
+ netapp.on_disconnected(move |id: NodeID, is_incoming: bool| {
+ basalt2.on_disconnected(id, is_incoming);
+ });
+
+ let basalt2 = basalt.clone();
+ netapp.add_msg_handler::<PullMessage, _, _>(move |_from: NodeID, _pullmsg: PullMessage| {
+ let push_msg = basalt2.make_push_message();
+ async move { push_msg }
+ });
+
+ let basalt2 = basalt.clone();
+ netapp.add_msg_handler::<PushMessage, _, _>(move |_from: NodeID, push_msg: PushMessage| {
+ basalt2.handle_peer_list(&push_msg.peers[..]);
+ async move { () }
+ });
+
+ basalt
+ }
+
+ pub fn sample(&self, count: usize) -> Vec<NodeID> {
+ self.view
+ .read()
+ .unwrap()
+ .sample(count)
+ .iter()
+ .map(|p| {
+ debug!("KYEV S {}", hex::encode(p.id));
+ p.id
+ })
+ .collect::<Vec<_>>()
+ }
+
+ pub async fn run(self: Arc<Self>) {
+ for peer in self.bootstrap_peers.iter() {
+ tokio::spawn(self.clone().try_connect(*peer));
+ }
+
+ let pushpull_loop = self.clone().run_pushpull_loop();
+ let reset_loop = self.run_reset_loop();
+ tokio::join!(pushpull_loop, reset_loop);
+ }
+
+ async fn run_pushpull_loop(self: Arc<Self>) {
+ loop {
+ tokio::time::delay_for(self.param.exchange_interval).await;
+
+ let peers = self.view.read().unwrap().sample(2);
+ if peers.len() == 2 {
+ tokio::spawn(self.clone().do_pull(peers[0].id));
+ tokio::spawn(self.clone().do_push(peers[1].id));
+ }
+ }
+ }
+
+ async fn do_pull(self: Arc<Self>, peer: NodeID) {
+ match self
+ .netapp
+ .request(&peer, PullMessage {}, PRIO_NORMAL)
+ .await
+ {
+ Ok(resp) => {
+ self.handle_peer_list(&resp.peers[..]);
+ trace!("KYEV PEXi {}", hex::encode(peer));
+ }
+ Err(e) => {
+ warn!("Error during pull exchange: {}", e);
+ }
+ };
+ }
+
+ async fn do_push(self: Arc<Self>, peer: NodeID) {
+ let push_msg = self.make_push_message();
+ match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await {
+ Ok(_) => {
+ trace!("KYEV PEXo {}", hex::encode(peer));
+ }
+ Err(e) => {
+ warn!("Error during push exchange: {}", e);
+ }
+ }
+ }
+
+ fn make_push_message(&self) -> PushMessage {
+ let current_peers = self.view.read().unwrap().current_peers_vec();
+ PushMessage {
+ peers: current_peers,
+ }
+ }
+
+ async fn run_reset_loop(self: Arc<Self>) {
+ loop {
+ tokio::time::delay_for(self.param.reset_interval).await;
+
+ {
+ debug!("KYEV R {}", self.param.reset_count);
+
+ let mut view = self.view.write().unwrap();
+ let prev_peers = view.current_peers();
+ let prev_peers_vec = prev_peers.iter().cloned().collect::<Vec<_>>();
+
+ view.reset_some_slots(self.param.reset_count);
+ view.update_all_slots(&prev_peers_vec[..]);
+
+ let new_peers = view.current_peers();
+ drop(view);
+
+ self.close_all_diff(&prev_peers, &new_peers);
+ }
+
+ let mut to_retry_maybe = self.bootstrap_peers.clone();
+ for (peer, _) in self.backlog.read().unwrap().iter() {
+ if !self.bootstrap_peers.contains(peer) {
+ to_retry_maybe.push(*peer);
+ }
+ }
+ self.handle_peer_list(&to_retry_maybe[..]);
+ }
+ }
+
+ fn handle_peer_list(self: &Arc<Self>, peers: &[Peer]) {
+ let to_connect = self.view.read().unwrap().should_try_list(peers);
+
+ for peer in to_connect.iter() {
+ tokio::spawn(self.clone().try_connect(*peer));
+ }
+ }
+
+ async fn try_connect(self: Arc<Self>, peer: Peer) {
+ {
+ let view = self.view.read().unwrap();
+ let mut attempts = self.current_attempts.write().unwrap();
+
+ if view.slots.iter().any(|x| x.peer == Some(peer)) {
+ return;
+ }
+ if attempts.contains(&peer) {
+ return;
+ }
+
+ attempts.insert(peer);
+ }
+ let res = self.netapp.clone().try_connect(peer.addr, peer.id).await;
+ trace!("Connection attempt to {}: {:?}", peer.addr, res);
+
+ self.current_attempts.write().unwrap().remove(&peer);
+
+ if res.is_err() {
+ self.backlog.write().unwrap().pop(&peer);
+ }
+ }
+
+ fn on_connected(self: &Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
+ if is_incoming {
+ self.handle_peer_list(&[Peer { id, addr }][..]);
+ } else {
+ info!("KYEV C {} {}", hex::encode(id), addr);
+ let peer = Peer { id, addr };
+
+ let mut backlog = self.backlog.write().unwrap();
+ if backlog.get(&peer).is_none() {
+ backlog.put(peer, ());
+ }
+ drop(backlog);
+
+ let mut view = self.view.write().unwrap();
+ let prev_peers = view.current_peers();
+
+ view.update_all_slots(&[peer][..]);
+
+ let new_peers = view.current_peers();
+ drop(view);
+
+ self.close_all_diff(&prev_peers, &new_peers);
+ }
+ }
+
+ fn on_disconnected(&self, id: NodeID, is_incoming: bool) {
+ if !is_incoming {
+ info!("KYEV D {}", hex::encode(id));
+ self.view.write().unwrap().disconnected(id);
+ }
+ }
+
+ fn close_all_diff(&self, prev_peers: &HashSet<Peer>, new_peers: &HashSet<Peer>) {
+ for peer in prev_peers.iter() {
+ if !new_peers.contains(peer) {
+ self.netapp.disconnect(&peer.id);
+ }
+ }
+ }
+}
+
+fn rand_seed() -> Seed {
+ let mut seed = [0u8; 32];
+ sodiumoxide::randombytes::randombytes_into(&mut seed[..]);
+ seed
+}
diff --git a/src/peering/mod.rs b/src/peering/mod.rs
index 044b1df..15b870a 100644
--- a/src/peering/mod.rs
+++ b/src/peering/mod.rs
@@ -1 +1,2 @@
pub mod fullmesh;
+pub mod basalt;