diff options
author | Alex Auvolat <alex@adnab.me> | 2018-10-11 14:22:00 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-10-11 14:22:00 +0200 |
commit | eb8c949551ffb8b3600357d7ff2bebe750af96e5 (patch) | |
tree | 36a0b46219ae595313b569c5885c470a0ddaca0e /shard/lib/net/group.ex | |
parent | 7b6042205e7c6135fae4e0d21dbf7a5975e8491b (diff) | |
download | shard-eb8c949551ffb8b3600357d7ff2bebe750af96e5.tar.gz shard-eb8c949551ffb8b3600357d7ff2bebe750af96e5.zip |
Address, broadcast group management
Diffstat (limited to 'shard/lib/net/group.ex')
-rw-r--r-- | shard/lib/net/group.ex | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/shard/lib/net/group.ex b/shard/lib/net/group.ex new file mode 100644 index 0000000..d2c2537 --- /dev/null +++ b/shard/lib/net/group.ex @@ -0,0 +1,98 @@ +defprotocol SNet.Group do + @moduledoc""" + A group is a specification of a bunch of peers we want and accept to talk to + about some things. It supports a number of abstract operations for finding peers, + broadcasting/gossiping, authenticating, etc. + """ + + @doc""" + Find new peers for this group, open connections and notify us when connections are open. + + Launches background processes if necessary, returns immediately. + """ + def init_lookup(group, notify_to) + + @doc""" + Get all currently open connections to peers in this group. + """ + def get_connections(group) + + @doc""" + Broadcast a message to peers of the group. + Will send to at most nmax peers, so this is a good primitive for gossip. + """ + def broadcast(group, msg, nmax \\ 10) + + @doc""" + Check if a peer is allowed to participate in this group. + """ + def in_group?(group, conn_pid, auth) +end + +defmodule SNet.PubShardGroup do + defstruct [:id] + + defimpl SNet.Group do + def init_lookup(%SNet.PubShardGroup{id: id}, _notify_to) do + # For now: ask all currently connected peers and connect to new peers we know of + spawn fn -> + for {_, pid, _} <- Shard.Manager.list_connections do + GenServer.cast(pid, {:send_msg, {:interested, [id]}}) + end + for peer_info <- Shard.Manager.get_shard_peers id do + if Shard.Manager.get_connections_to peer_info == [] do + Shard.Manager.add_peer(peer_info) # TODO callback when connected + end + end + end + # TODO: use a DHT to find peers + end + + def get_connections(%SNet.PubShardGroup{id: id}) do + for peer_info <- Shard.Manager.get_shard_peers(id), + [{pid, _auth}|_] = Shard.Manager.get_connections_to(peer_info), + do: pid + end + + def broadcast(group, msg, nmax) do + %SNet.PubShardGroup{id: id} = group + nsent = get_connections(group) + |> Enum.shuffle + |> Enum.take(nmax) + |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg}))) + |> Enum.count + if nmax - nsent > 0 do + Shard.Manager.get_shard_peers(id) + |> Enum.filter(&(Shard.Manager.get_connections_to(&1) == [])) + |> Enum.shuffle + |> Enum.take(nmax - nsent) + |> Enum.map(&(Shard.Manager.send(&1, msg))) + end + end + + def in_group?(%SNet.PubShardGroup{id: _id}, _peer_pid, _auth) do + true # No access control + end + end +end + +defmodule SNet.PrivGroup do + defstruct [:pk_list] + + defimpl SNet.Group do + def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do + # TODO + end + + def get_connections(%SNet.PubShardGroup{id: id}) do + # TODO + end + + def broadcast(group, msg, nmax) do + end + + def in_group?(%SNet.PubShardGroup{id: _id}, peer_pid, auth) do + # TODO + end + end +end |