1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
defmodule SNet.Manager do
@moduledoc"""
- :connections (not persistent)
List of
{ peer_info, pid, nil | {my_pk, his_pk} }
"""
use GenServer
require Logger
def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def init(_) do
Process.flag(:trap_exit, true)
:ets.new(:connections, [:bag, :protected, :named_table])
{:ok, nil}
end
def handle_call({:add_peer, peer_info, auth, callback}, _from, state) do
pid = add_peer_internal(peer_info, auth)
if callback != nil do
GenServer.cast(pid, {:callback, callback})
end
{:reply, pid, state}
end
def handle_call({:accept, client}, _from, state) do
my_port = Application.get_env(:shard, :port)
{:ok, pid} = SNet.TCPConn.start_link(%{socket: client, my_port: my_port})
{:reply, pid, state}
end
def handle_call({:peer_up, pid, peer_info, auth}, _from, state) do
case :ets.match(:connections, {peer_info, :'$1', auth}) do
[[pid2]|_] when pid2 != pid ->
{:reply, :redundant, state}
_ ->
:ets.match_delete(:connections, {peer_info, pid, :_})
:ets.insert(:connections, {peer_info, pid, auth})
# Send interested message for all our shards
id_list = (for {id, _, _} <- Shard.Manager.list_shards(), do: id)
GenServer.cast(pid, {:send_msg, {:interested, id_list}})
{:reply, :ok, state}
end
end
def handle_cast({:connect_and_send, peer_info, auth, msg}, state) do
pid = add_peer_internal(peer_info, auth)
GenServer.cast(pid, {:send_msg, msg})
{:noreply, state}
end
def handle_info({:EXIT, pid, _reason}, state) do
:ets.match_delete(:connections, {:_, pid, :_})
{:noreply, state}
end
defp add_peer_internal(peer_info, auth) do
if SNet.Addr.is_local? peer_info do
nil
else
case :ets.match(:connections, {peer_info, :'$1', (if auth != nil do auth else :_ end)}) do
[[pid]|_] -> pid
[] ->
my_port = Application.get_env(:shard, :port)
{:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: auth})
:ets.insert(:connections, {peer_info, pid, auth})
pid
end
end
end
# =========
# INTERFACE
# =========
@doc"""
Connect to a peer specified by ip address and port
"""
def add_peer(peer_info, opts \\ []) do
GenServer.call(__MODULE__, {:add_peer, peer_info, opts[:auth], opts[:callback]})
end
@doc"""
Return the list of all connected peers
"""
def list_connections() do
:ets.tab2list(:connections)
end
@doc"""
Return the list of connections to a given peer, possibly with different auth
"""
def get_connections_to(peer_info) do
for {^peer_info, pid, auth} <- :ets.lookup(:connections, peer_info), do: {pid, auth}
end
@doc"""
Return the list of connections to a given peer that match a given auth spec
"""
def get_auth_connections_to(peer_info, my_auth, his_auth) do
for {^peer_info, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}} <- :ets.lookup(:connections, peer_info),
my_pk == my_auth or my_pk in my_auth,
his_pk == his_auth or his_pk in his_auth,
do: pid
end
@doc"""
Send message to a peer specified by peer info.
Opens a connection if necessary.
"""
def send(peer_info, msg) do
case :ets.lookup(:connections, peer_info) do
[{^peer_info, pid, _auth}|_] ->
GenServer.cast(pid, {:send_msg, msg})
[] ->
GenServer.cast(__MODULE__, {:connect_and_send, peer_info, nil, msg})
end
end
def send_auth(peer_info, auth, msg) do
case :ets.match(:connections, {peer_info, :'$1', auth}) do
[[pid]|_] ->
GenServer.cast(pid, {:send_msg, msg})
[] ->
GenServer.cast(__MODULE__, {:connect_and_send, peer_info, auth, msg})
end
end
@doc"""
Send message to a peer specified by peer id
"""
def send_pid(pid, msg) do
GenServer.cast(pid, {:send_msg, msg})
end
end
|