aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/net/group.ex
blob: 692438aebca87cb5e44c27e6ba7c67d3bc52689d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
defprotocol SNet.Group do
  @moduledoc"""
  A group is a specification of a bunch of peers we want and accept to talk to
  about some things. It supports a number of abstract operations for finding peers,
  broadcasting/gossiping, authenticating, etc.
  """

  @doc"""
  Find new peers for this group, open connections and notify us when connections are open.

  Launches background processes if necessary, returns immediately.
  """
  def init_lookup(group, notify_to) 

  @doc"""
  Get all currently open connections to peers in this group.
  """
  def get_connections(group)

  @doc"""
  Broadcast a message to peers of the group.
  Will send to at most nmax peers, so this is a good primitive for gossip.
  """
  def broadcast(group, msg, nmax \\ 10)

  @doc"""
  Check if a peer is allowed to participate in this group.
  """
  def in_group?(group, conn_pid, auth)
end

defmodule SNet.PubShardGroup do
  defstruct [:id]

  defimpl SNet.Group do
    def init_lookup(%SNet.PubShardGroup{id: id}, _notify_to) do
      # For now: ask all currently connected peers and connect to new peers we know of
      spawn fn ->
        for {_, pid, _} <- SNet.Manager.list_connections do
          GenServer.cast(pid, {:send_msg, {:interested, [id]}})
        end
        for peer_info <- Shard.Manager.get_shard_peers id do
          if SNet.Manager.get_connections_to peer_info == [] do
            SNet.Manager.add_peer(peer_info)      # TODO callback when connected
          end
        end
      end
      # TODO: use a DHT to find peers
    end

    def get_connections(%SNet.PubShardGroup{id: id}) do
      Shard.Manager.get_shard_peers(id)
      |> Enum.map(&(SNet.Manager.get_connections_to(&1)))
      |> Enum.filter(&(&1 != []))
      |> Enum.map(fn [{pid, _auth}|_] -> pid end)
    end

    def broadcast(group, msg, nmax) do
      %SNet.PubShardGroup{id: id} = group
      nsent = get_connections(group)
      |> Enum.shuffle
      |> Enum.take(nmax)
      |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg})))
      |> Enum.count
      if nmax - nsent > 0 do
        Shard.Manager.get_shard_peers(id)
        |> Enum.filter(&(SNet.Manager.get_connections_to(&1) == []))
        |> Enum.shuffle
        |> Enum.take(nmax - nsent)
        |> Enum.map(&(SNet.Manager.send(&1, msg)))
      end
    end

    def in_group?(%SNet.PubShardGroup{id: _id}, _peer_pid, _auth) do
      true     # No access control
    end
  end
end

defmodule SNet.PrivGroup do
  defstruct [:pk_list]

  defimpl SNet.Group do
    def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do
      # TODO
    end

    def get_connections(%SNet.PubShardGroup{id: id}) do
      # TODO
    end

    def broadcast(group, msg, nmax) do
    end

    def in_group?(%SNet.PubShardGroup{id: _id}, peer_pid, auth) do
      # TODO
    end
  end
end