From 1646bc57eae9880fd408d23ca692364dc6fd6442 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Oct 2018 15:11:52 +0200 Subject: Move somme functionnality to SNet.Manager --- shard/lib/net/manager.ex | 119 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 shard/lib/net/manager.ex (limited to 'shard/lib/net/manager.ex') diff --git a/shard/lib/net/manager.ex b/shard/lib/net/manager.ex new file mode 100644 index 0000000..17d6e06 --- /dev/null +++ b/shard/lib/net/manager.ex @@ -0,0 +1,119 @@ +defmodule SNet.Manager do + @moduledoc""" + - :connections (not persistent) + + List of + { peer_info, pid, nil | {my_pk, his_pk} } + """ + + use GenServer + + require Logger + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + def init(_) do + Process.flag(:trap_exit, true) + + :ets.new(:connections, [:bag, :protected, :named_table]) + + {:ok, nil} + end + + def handle_call({:add_peer, peer_info}, _from, state) do + pid = add_peer_internal(peer_info) + {:reply, pid, state} + end + + def handle_call({:accept, client}, _from, state) do + my_port = Application.get_env(:shard, :port) + {:ok, pid} = SNet.TCPConn.start_link(%{socket: client, my_port: my_port}) + {:reply, pid, state} + end + + def handle_call({:peer_up, pid, peer_info, auth}, _from, state) do + case :ets.match(:connections, {peer_info, :_, auth}) do + [{_, pid2, _}] when pid2 != pid -> + {:reply, :redundant, state} + _ -> + :ets.insert(:connections, {peer_info, pid, auth}) + + # Send interested message for all our shards + id_list = (for {id, _, _} <- Shard.Manager.list_shards(), do: id) + GenServer.cast(pid, {:send_msg, {:interested, id_list}}) + + {:reply, :ok, state} + end + end + + def handle_cast({:connect_and_send, peer_info, msg}, state) do + pid = add_peer_internal(peer_info) + GenServer.cast(pid, {:send_msg, msg}) + {:noreply, state} + end + + def handle_info({:EXIT, pid, _reason}, state) do + :ets.match_delete(:connections, {:_, pid, :_}) + {:noreply, state} + end + + defp add_peer_internal(peer_info) do + case :ets.lookup(:connections, peer_info) do + [{_, pid, _}|_] -> + pid + [] -> + my_port = Application.get_env(:shard, :port) + {:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: nil}) + :ets.insert(:connections, {peer_info, pid, nil}) + pid + end + end + + # ========= + # INTERFACE + # ========= + + @doc""" + Connect to a peer specified by ip address and port + """ + def add_peer(peer_info) do + GenServer.call(__MODULE__, {:add_peer, peer_info}) + end + + @doc""" + Return the list of all connected peers + """ + def list_connections() do + for [x] <- :ets.match(:connections, :"$1"), do: x + end + + @doc""" + Return the list of connections to a given peer, possibly with different auth + """ + def get_connections_to(peer_info) do + for {^peer_info, pid, auth} <- :ets.lookup(:connections, peer_info), do: {pid, auth} + end + + @doc""" + Send message to a peer specified by peer info. + Opens a connection if necessary. + """ + def send(peer_info, msg) do + case :ets.lookup(:connections, peer_info) do + [{^peer_info, pid, _auth}|_] -> + GenServer.cast(pid, {:send_msg, msg}) + [] -> + GenServer.cast(__MODULE__, {:connect_and_send, peer_info, msg}) + end + end + + @doc""" + Send message to a peer specified by peer id + """ + def send_pid(pid, msg) do + GenServer.cast(pid, {:send_msg, msg}) + end +end + -- cgit v1.2.3