aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/net/group.ex
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-10-11 14:22:00 +0200
committerAlex Auvolat <alex@adnab.me>2018-10-11 14:22:00 +0200
commiteb8c949551ffb8b3600357d7ff2bebe750af96e5 (patch)
tree36a0b46219ae595313b569c5885c470a0ddaca0e /shard/lib/net/group.ex
parent7b6042205e7c6135fae4e0d21dbf7a5975e8491b (diff)
downloadshard-eb8c949551ffb8b3600357d7ff2bebe750af96e5.tar.gz
shard-eb8c949551ffb8b3600357d7ff2bebe750af96e5.zip
Address, broadcast group management
Diffstat (limited to 'shard/lib/net/group.ex')
-rw-r--r--shard/lib/net/group.ex98
1 files changed, 98 insertions, 0 deletions
diff --git a/shard/lib/net/group.ex b/shard/lib/net/group.ex
new file mode 100644
index 0000000..d2c2537
--- /dev/null
+++ b/shard/lib/net/group.ex
@@ -0,0 +1,98 @@
+defprotocol SNet.Group do
+ @moduledoc"""
+ A group is a specification of a bunch of peers we want and accept to talk to
+ about some things. It supports a number of abstract operations for finding peers,
+ broadcasting/gossiping, authenticating, etc.
+ """
+
+ @doc"""
+ Find new peers for this group, open connections and notify us when connections are open.
+
+ Launches background processes if necessary, returns immediately.
+ """
+ def init_lookup(group, notify_to)
+
+ @doc"""
+ Get all currently open connections to peers in this group.
+ """
+ def get_connections(group)
+
+ @doc"""
+ Broadcast a message to peers of the group.
+ Will send to at most nmax peers, so this is a good primitive for gossip.
+ """
+ def broadcast(group, msg, nmax \\ 10)
+
+ @doc"""
+ Check if a peer is allowed to participate in this group.
+ """
+ def in_group?(group, conn_pid, auth)
+end
+
+defmodule SNet.PubShardGroup do
+ defstruct [:id]
+
+ defimpl SNet.Group do
+ def init_lookup(%SNet.PubShardGroup{id: id}, _notify_to) do
+ # For now: ask all currently connected peers and connect to new peers we know of
+ spawn fn ->
+ for {_, pid, _} <- Shard.Manager.list_connections do
+ GenServer.cast(pid, {:send_msg, {:interested, [id]}})
+ end
+ for peer_info <- Shard.Manager.get_shard_peers id do
+ if Shard.Manager.get_connections_to peer_info == [] do
+ Shard.Manager.add_peer(peer_info) # TODO callback when connected
+ end
+ end
+ end
+ # TODO: use a DHT to find peers
+ end
+
+ def get_connections(%SNet.PubShardGroup{id: id}) do
+ for peer_info <- Shard.Manager.get_shard_peers(id),
+ [{pid, _auth}|_] = Shard.Manager.get_connections_to(peer_info),
+ do: pid
+ end
+
+ def broadcast(group, msg, nmax) do
+ %SNet.PubShardGroup{id: id} = group
+ nsent = get_connections(group)
+ |> Enum.shuffle
+ |> Enum.take(nmax)
+ |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg})))
+ |> Enum.count
+ if nmax - nsent > 0 do
+ Shard.Manager.get_shard_peers(id)
+ |> Enum.filter(&(Shard.Manager.get_connections_to(&1) == []))
+ |> Enum.shuffle
+ |> Enum.take(nmax - nsent)
+ |> Enum.map(&(Shard.Manager.send(&1, msg)))
+ end
+ end
+
+ def in_group?(%SNet.PubShardGroup{id: _id}, _peer_pid, _auth) do
+ true # No access control
+ end
+ end
+end
+
+defmodule SNet.PrivGroup do
+ defstruct [:pk_list]
+
+ defimpl SNet.Group do
+ def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do
+ # TODO
+ end
+
+ def get_connections(%SNet.PubShardGroup{id: id}) do
+ # TODO
+ end
+
+ def broadcast(group, msg, nmax) do
+ end
+
+ def in_group?(%SNet.PubShardGroup{id: _id}, peer_pid, auth) do
+ # TODO
+ end
+ end
+end