aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/net/group.ex
blob: 7086c2d398b04f6d8d6efd4dec0f11f867c02b3a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
defprotocol SNet.Group do
  @moduledoc"""
  A group is a specification of a bunch of peers we want and accept to talk to
  about some things. It supports a number of abstract operations for finding peers,
  broadcasting/gossiping, authenticating, etc.
  """

  @doc"""
  Find new peers for this group, open connections and notify us when connections are open.

  Launches background processes if necessary, returns immediately.
  """
  def init_lookup(group, notify_to) 

  @doc"""
  Get all currently open connections to peers in this group.
  """
  def get_connections(group)

  @doc"""
  Broadcast a message to peers of the group.
  Will send to at most nmax peers, so this is a good primitive for gossip.
  """
  def broadcast(group, msg, nmax \\ 10)

  @doc"""
  Check if a peer is allowed to participate in this group.
  """
  def in_group?(group, conn_pid, auth)
end

defmodule SNet.PubShardGroup do
  defstruct [:id]

  defimpl SNet.Group do
    def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do
      # For now: ask all currently connected peers and connect to new peers we know of
      spawn fn ->
        for {_, pid, _} <- SNet.Manager.list_connections do
          GenServer.cast(notify_to, {:peer_connected, pid})
        end
        for peer_info <- Shard.Manager.get_shard_peers id do
          if SNet.Manager.get_connections_to peer_info == [] do
            SNet.Manager.add_peer(peer_info,
              callback: fn pid ->
                GenServer.cast(notify_to, {:peer_connected, pid})
              end)
          end
        end
      end
      # TODO: use a DHT to find peers
    end

    def get_connections(%SNet.PubShardGroup{id: id}) do
      Shard.Manager.get_shard_peers(id)
      |> Enum.map(&(SNet.Manager.get_connections_to(&1)))
      |> Enum.filter(&(&1 != []))
      |> Enum.map(fn [{pid, _auth}|_] -> pid end)
    end

    def broadcast(group, msg, nmax) do
      %SNet.PubShardGroup{id: id} = group
      nsent = get_connections(group)
      |> Enum.shuffle
      |> Enum.take(nmax)
      |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg})))
      |> Enum.count
      if nmax - nsent > 0 do
        Shard.Manager.get_shard_peers(id)
        |> Enum.filter(&(SNet.Manager.get_connections_to(&1) == []))
        |> Enum.shuffle
        |> Enum.take(nmax - nsent)
        |> Enum.map(&(SNet.Manager.send(&1, msg)))
      end
    end

    def in_group?(%SNet.PubShardGroup{id: _id}, _peer_pid, _auth) do
      true     # No access control
    end
  end
end

defmodule SNet.PrivGroup do
  defstruct [:pk_list]

  defimpl SNet.Group do
    def init_lookup(%SNet.PrivGroup{pk_list: pk_list}, notify_to) do
      spawn fn ->
        # 1. We might already have some connections to these guys
        for {_, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}} <- SNet.Manager.list_connections do
          if (my_pk in pk_list) and (his_pk in pk_list) do
            GenServer.cast(notify_to, {:peer_connected, pid})
          end
        end
        # 2. We might also want to open some new connections to these guys
        [my_pk|_] = Enum.filter(pk_list, &Shard.Keys.have_sk?/1)
        for pk <- pk_list do
          pid = SApp.Identity.find_proc(pk)
          info = GenServer.call(pid, :get_info)
          if Map.has_key?(info, :peer_info) do
            for pi <- info.peer_info do
              SNet.Manager.add_peer(pi,
                auth: %SNet.Auth{my_pk: my_pk, his_pk: pk},
                callback: fn pid ->
                  GenServer.cast(notify_to, {:peer_connected, pid})
                end)
            end
          end
        end
      end
    end

    def get_connections(%SNet.PrivGroup{pk_list: pk_list}) do
      for {_, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}} <- SNet.Manager.list_connections,
        (my_pk in pk_list) and (his_pk in pk_list),
      do: pid
    end

    def broadcast(group, msg, nmax) do
      %SNet.PrivGroup{pk_list: pk_list} = group
      nsent = get_connections(group)
      |> Enum.shuffle
      |> Enum.take(nmax)
      |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg})))
      |> Enum.count
      if nmax - nsent > 0 do
        my_pks = Enum.filter(pk_list, &Shard.Keys.have_sk?/1)
        [my_pk|_] = my_pks
        candidates = for pk <- pk_list,
          pid = SApp.Identity.find_proc(pk),
          info = GenServer.call(pid, :get_info),
          Map.has_key?(info, :peer_info),
          xx <- info.peer_info,
          do: {xx, pk}
        candidates
        |> Enum.filter(fn {peer_info, his_pk} ->
            SNet.Manager.get_auth_connections_to(peer_info, my_pks, his_pk) == [] end)
        |> Enum.shuffle()
        |> Enum.take(nmax - nsent)
        |> Enum.map(fn {peer_info, his_pk} ->
            SNet.Manager.send_auth(peer_info, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, msg) end)
      end
    end

    def in_group?(%SNet.PrivGroup{pk_list: pk_list}, _peer_pid, auth) do
      case auth do
        nil -> false
        %SNet.Auth{my_pk: my_pk, his_pk: his_pk} ->
          (my_pk in pk_list) and (his_pk in pk_list)
      end
    end
  end
end