defmodule SNet.Manager do
@moduledoc"""
- :connections (not persistent)
List of
{ peer_info, pid, nil | {my_pk, his_pk} }
"""
use GenServer
require Logger
def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def init(_) do
Process.flag(:trap_exit, true)
:ets.new(:connections, [:bag, :protected, :named_table])
{:ok, nil}
end
def handle_call({:add_peer, peer_info, auth, callback}, _from, state) do
pid = add_peer_internal(peer_info, auth)
if callback != nil do
GenServer.cast(pid, {:callback, callback})
end
{:reply, pid, state}
end
def handle_call({:accept, client}, _from, state) do
my_port = Application.get_env(:shard, :port)
{:ok, pid} = SNet.TCPConn.start_link(%{socket: client, my_port: my_port})
{:reply, pid, state}
end
def handle_call({:peer_up, pid, peer_info, auth}, _from, state) do
case :ets.match(:connections, {peer_info, :'$1', (if auth != nil do auth else :_ end), :_}) do
[[pid2]|_] when pid2 != pid ->
{:reply, :redundant, state}
_ ->
:ets.match_delete(:connections, {peer_info, pid, :_, :_})
:ets.insert(:connections, {peer_info, pid, auth, :established})
if auth != nil do
for [pid3] <- :ets.match(:connections, {peer_info, :'$1', nil, :_}) do
GenServer.cast(pid3, :close)
end
end
# Send interested message for all our shards
id_list = (for {id, _, _} <- Shard.Manager.list_shards(), do: id)
GenServer.cast(pid, {:send_msg, {:interested, id_list}})
{:reply, :ok, state}
end
end
def handle_cast({:connect_and_send, peer_info, auth, msg}, state) do
pid = add_peer_internal(peer_info, auth)
GenServer.cast(pid, {:send_msg, msg})
{:noreply, state}
end
def handle_info({:EXIT, pid, _reason}, state) do
:ets.match_delete(:connections, {:_, pid, :_, :_})
{:noreply, state}
end
defp add_peer_internal(peer_info, auth) do
if SNet.Addr.is_local? peer_info do
nil
else
case :ets.match(:connections, {peer_info, :'$1', (if auth != nil do auth else :_ end), :_}) do
[[pid]|_] -> pid
[] ->
my_port = Application.get_env(:shard, :port)
{:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: auth})
:ets.insert(:connections, {peer_info, pid, auth, :establishing})
pid
end
end
end
# =========
# INTERFACE
# =========
@doc"""
Connect to a peer specified by ip address and port
peer_info := {:inet, ip, port}
"""
def add_peer(peer_info, opts \\ []) do
GenServer.call(__MODULE__, {:add_peer, peer_info, opts[:auth], opts[:callback]})
end
@doc"""
Return the list of all connected peers
"""
def list_connections() do
:ets.tab2list(:connections)
end
@doc"""
Return the list of connections to a given peer, possibly with different auth
"""
def get_connections_to(peer_info) do
for {^peer_info, pid, auth, _} <- :ets.lookup(:connections, peer_info), do: {pid, auth}
end
@doc"""
Return the list of connections to a given peer that match a given auth spec
"""
def get_auth_connections_to(peer_info, my_auth, his_auth) do
for {^peer_info, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, _} <- :ets.lookup(:connections, peer_info),
my_pk == my_auth or my_pk in my_auth,
his_pk == his_auth or his_pk in his_auth,
do: pid
end
@doc"""
Send message to a peer specified by peer info.
Opens a connection if necessary.
"""
def send(peer_info, msg) do
case :ets.lookup(:connections, peer_info) do
[{^peer_info, pid, _auth, _}|_] ->
GenServer.cast(pid, {:send_msg, msg})
[] ->
GenServer.cast(__MODULE__, {:connect_and_send, peer_info, nil, msg})
end
end
def send_auth(peer_info, auth, msg) do
case :ets.match(:connections, {peer_info, :'$1', auth, :_}) do
[[pid]|_] ->
GenServer.cast(pid, {:send_msg, msg})
[] ->
GenServer.cast(__MODULE__, {:connect_and_send, peer_info, auth, msg})
end
end
@doc"""
Send message to a peer specified by peer id
"""
def send_pid(pid, msg) do
GenServer.cast(pid, {:send_msg, msg})
end
end