1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
defprotocol SNet.Group do
@moduledoc"""
A group is a specification of a bunch of peers we want and accept to talk to
about some things. It supports a number of abstract operations for finding peers,
broadcasting/gossiping, authenticating, etc.
"""
@doc"""
Find new peers for this group, open connections and notify us when connections are open.
Launches background processes if necessary, returns immediately.
"""
def init_lookup(group, notify_to)
@doc"""
Get all currently open connections to peers in this group.
"""
def get_connections(group)
@doc"""
Broadcast a message to peers of the group.
Will send to at most nmax peers, so this is a good primitive for gossip.
"""
def broadcast(group, msg, opts \\ [])
@doc"""
Check if a peer is allowed to participate in this group.
"""
def in_group?(group, conn_pid, auth)
end
defmodule SNet.PubShardGroup do
defstruct [:id]
defimpl SNet.Group do
def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do
# For now: ask all currently connected peers and connect to new peers we know of
spawn fn ->
for {_, pid, _, _} <- SNet.Manager.list_connections do
GenServer.cast(notify_to, {:peer_connected, pid})
end
for peer_info <- Shard.Manager.get_shard_peers id do
if SNet.Manager.get_connections_to peer_info == [] do
SNet.Manager.add_peer(peer_info,
callback: fn pid ->
GenServer.cast(notify_to, {:peer_connected, pid})
end)
end
end
end
# TODO: use a DHT to find peers
end
def get_connections(%SNet.PubShardGroup{id: id}) do
Shard.Manager.get_shard_peers(id)
|> Enum.map(&(SNet.Manager.get_connections_to(&1)))
|> Enum.filter(&(&1 != []))
|> Enum.map(fn [{pid, _auth}|_] -> pid end)
end
def broadcast(group, msg, opts) do
nmax = opts[:nmax] || 10
exclude_pid = opts[:exclude_pid] || []
%SNet.PubShardGroup{id: id} = group
nsent = get_connections(group)
|> Enum.filter(&(&1 not in exclude_pid))
|> Enum.shuffle
|> Enum.take(nmax)
|> Enum.map(&(GenServer.cast(&1, {:send_msg, msg})))
|> Enum.count
if nmax - nsent > 0 do
Shard.Manager.get_shard_peers(id)
|> Enum.filter(&(SNet.Manager.get_connections_to(&1) == []))
|> Enum.shuffle
|> Enum.take(nmax - nsent)
|> Enum.map(&(SNet.Manager.send(&1, msg)))
end
end
def in_group?(%SNet.PubShardGroup{id: _id}, _peer_pid, _auth) do
true # No access control
end
end
end
defmodule SNet.PrivGroup do
defstruct [:pk_list]
defimpl SNet.Group do
def init_lookup(%SNet.PrivGroup{pk_list: pk_list}, notify_to) do
spawn fn ->
# 1. We might already have some connections to these guys
for {_, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, _} <- SNet.Manager.list_connections do
if (my_pk in pk_list) and (his_pk in pk_list) do
GenServer.cast(notify_to, {:peer_connected, pid})
end
end
# 2. We might also want to open some new connections to these guys
[my_pk|_] = Enum.filter(pk_list, &Shard.Keys.have_sk?/1)
for pk <- pk_list do
pid = SApp.Identity.find_proc(pk)
info = GenServer.call(pid, :get_info)
if Map.has_key?(info, :peer_info) do
for pi <- info.peer_info do
SNet.Manager.add_peer(pi,
auth: %SNet.Auth{my_pk: my_pk, his_pk: pk},
callback: fn pid ->
GenServer.cast(notify_to, {:peer_connected, pid})
end)
end
end
end
end
end
def get_connections(%SNet.PrivGroup{pk_list: pk_list}) do
for {_, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, _} <- SNet.Manager.list_connections,
(my_pk in pk_list) and (his_pk in pk_list),
do: pid
end
def broadcast(group, msg, opts) do
nmax = opts[:nmax] || 10
exclude_pid = opts[:exclude_pid] || []
%SNet.PrivGroup{pk_list: pk_list} = group
nsent = get_connections(group)
|> Enum.filter(&(&1 not in exclude_pid))
|> Enum.shuffle
|> Enum.take(nmax)
|> Enum.map(&(GenServer.cast(&1, {:send_msg, msg})))
|> Enum.count
if nmax - nsent > 0 do
my_pks = Enum.filter(pk_list, &Shard.Keys.have_sk?/1)
[my_pk|_] = my_pks
candidates = for pk <- pk_list,
pid = SApp.Identity.find_proc(pk),
info = GenServer.call(pid, :get_info),
Map.has_key?(info, :peer_info),
xx <- info.peer_info,
do: {xx, pk}
candidates
|> Enum.filter(fn {peer_info, his_pk} ->
SNet.Manager.get_auth_connections_to(peer_info, my_pks, his_pk) == [] end)
|> Enum.shuffle()
|> Enum.take(nmax - nsent)
|> Enum.map(fn {peer_info, his_pk} ->
SNet.Manager.send_auth(peer_info, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, msg) end)
end
end
def in_group?(%SNet.PrivGroup{pk_list: pk_list}, _peer_pid, auth) do
case auth do
nil -> false
%SNet.Auth{my_pk: my_pk, his_pk: his_pk} ->
(my_pk in pk_list) and (his_pk in pk_list)
end
end
end
end
|