defmodule SNet.Manager do @moduledoc""" Maintains a table `:connections` of currently connected peers, which is a list of: { peer_info, pid, nil | %SNet.Auth{my_pk: my_pk, his_pk: his_pk} } """ use GenServer require Logger def start_link(_) do GenServer.start_link(__MODULE__, nil, name: __MODULE__) end def init(_) do Process.flag(:trap_exit, true) :ets.new(:connections, [:bag, :protected, :named_table]) {:ok, nil} end def handle_call({:add_peer, peer_info, auth, callback}, _from, state) do pid = add_peer_internal(peer_info, auth) if callback != nil do GenServer.cast(pid, {:callback, callback}) end {:reply, pid, state} end def handle_call({:accept, client}, _from, state) do my_port = Application.get_env(:shard, :port) {:ok, pid} = SNet.TCPConn.start_link(%{socket: client, my_port: my_port}) {:reply, pid, state} end def handle_call({:peer_up, pid, peer_info, auth}, _from, state) do case :ets.match(:connections, {peer_info, :'$1', (if auth != nil do auth else :_ end), :_}) do [[pid2]|_] when pid2 != pid -> {:reply, :redundant, state} _ -> :ets.match_delete(:connections, {peer_info, pid, :_, :_}) :ets.insert(:connections, {peer_info, pid, auth, :established}) if auth != nil do for [pid3] <- :ets.match(:connections, {peer_info, :'$1', nil, :_}) do GenServer.cast(pid3, :close) end end # Send interested message for all our shards id_list = (for {id, _, _} <- Shard.Manager.list_shards(), do: id) GenServer.cast(pid, {:send_msg, {:interested, id_list}}) {:reply, :ok, state} end end def handle_cast({:connect_and_send, peer_info, auth, msg}, state) do pid = add_peer_internal(peer_info, auth) GenServer.cast(pid, {:send_msg, msg}) {:noreply, state} end def handle_info({:EXIT, pid, _reason}, state) do :ets.match_delete(:connections, {:_, pid, :_, :_}) {:noreply, state} end defp add_peer_internal(peer_info, auth) do if SNet.Addr.is_local? peer_info do nil else case :ets.match(:connections, {peer_info, :'$1', (if auth != nil do auth else :_ end), :_}) do [[pid]|_] -> pid [] -> my_port = Application.get_env(:shard, :port) {:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: auth}) :ets.insert(:connections, {peer_info, pid, auth, :establishing}) pid end end end # ========= # INTERFACE # ========= @doc""" Connect to a peer specified by ip address and port peer_info := {:inet, ip, port} """ def add_peer(peer_info, opts \\ []) do GenServer.call(__MODULE__, {:add_peer, peer_info, opts[:auth], opts[:callback]}) end @doc""" Return the list of all connected peers """ def list_connections() do :ets.tab2list(:connections) end @doc""" Return the list of connections to a given peer, possibly with different auth """ def get_connections_to(peer_info) do for {^peer_info, pid, auth, _} <- :ets.lookup(:connections, peer_info), do: {pid, auth} end @doc""" Return the list of connections to a given peer that match a given auth spec """ def get_auth_connections_to(peer_info, my_auth, his_auth) do for {^peer_info, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, _} <- :ets.lookup(:connections, peer_info), my_pk == my_auth or my_pk in my_auth, his_pk == his_auth or his_pk in his_auth, do: pid end @doc""" Send message to a peer specified by peer info. Opens a connection if necessary. """ def send(peer_info, msg) do case :ets.lookup(:connections, peer_info) do [{^peer_info, pid, _auth, _}|_] -> GenServer.cast(pid, {:send_msg, msg}) [] -> GenServer.cast(__MODULE__, {:connect_and_send, peer_info, nil, msg}) end end @doc""" Send message to a peer specified by peer info over authenticated channel. `auth` is a `SNet.Auth` struct describing the required authentication. Opens a connection if necessary. """ def send_auth(peer_info, auth, msg) do case :ets.match(:connections, {peer_info, :'$1', auth, :_}) do [[pid]|_] -> GenServer.cast(pid, {:send_msg, msg}) [] -> GenServer.cast(__MODULE__, {:connect_and_send, peer_info, auth, msg}) end end @doc""" Send message to a peer specified by peer id """ def send_pid(pid, msg) do GenServer.cast(pid, {:send_msg, msg}) end end