diff options
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r-- | shard/lib/manager.ex | 59 |
1 files changed, 21 insertions, 38 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index 3f5416d..08e14c7 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -34,7 +34,7 @@ defmodule Shard.Manager do Mult-list of { shard_id, peer_info } # TODO: add health info (last seen, ping, etc) - peer_info := {:inet4, ip, port} | {:inet6, ip, port} | {:onion, name} + peer_info := {:inet, ip, port} | {:inet6, ip, port} | {:onion, name} - :shard_procs (not persistent) @@ -67,11 +67,11 @@ defmodule Shard.Manager do @shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom - def start_link(my_port) do - GenServer.start_link(__MODULE__, my_port, name: __MODULE__) + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) end - def init(my_port) do + def init(_) do :dets.open_file(@shard_db, [type: :set]) for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do :dets.insert @shard_db, {id, manifest, nil} @@ -85,7 +85,7 @@ defmodule Shard.Manager do :ets.new(:connections, [:bag, :protected, :named_table]) outbox = :ets.new(:outbox, [:bag, :private]) - {:ok, %{my_port: my_port, outbox: outbox} } + {:ok, %{outbox: outbox} } end def handle_call({:register, shard_id, manifest, pid}, _from, state) do @@ -153,9 +153,11 @@ defmodule Shard.Manager do end def handle_cast({:connect_and_send, peer_info, msg}, state) do - case peer_info do - {:tcp4, ip, port} -> - add_peer(ip, port, state) + case :ets.lookup(:connections, peer_info) do + [{_, pid, _}|_] -> + GenServer.cast(pid, {:send_msg, msg}) + [] -> + add_peer(peer_info) currtime = System.os_time :second :ets.insert(state.outbox, {peer_info, msg, currtime}) outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ] @@ -164,39 +166,11 @@ defmodule Shard.Manager do {:noreply, state} end - def handle_cast({:try_connect, pk_list}, state) do - for pk <- pk_list do - case :dets.lookup(@peer_db, pk) do - [{^pk, nil, ip, port}] -> - add_peer(ip, port, state) - _ -> nil - end - end - {:noreply, state} - end - - def handle_cast({:add_peer, ip, port}, state) do - add_peer(ip, port, state) - {:noreply, state} - end - def handle_info({:DOWN, _, :process, pid, _}, state) do :ets.match_delete(:shard_procs, {:_, pid}) {:noreply, state} end - defp add_peer(ip, port, state) do - spawn fn -> - case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do - {:ok, client} -> - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port, is_client: true, auth: nil}}) - :ok = :gen_tcp.controlling_process(client, pid) - _ -> - Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" - end - end - end - # ====================== # CALLED BY SNet.TcpConn @@ -305,8 +279,17 @@ defmodule Shard.Manager do @doc""" Connect to a peer specified by ip address and port """ - def add_peer(ip, port) do - GenServer.cast(__MODULE__, {:add_peer, ip, port}) + def add_peer({:inet, ip, port}) do + spawn fn -> + case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do + {:ok, client} -> + my_port = Application.get_env(:shard, :port) + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port, is_client: true, auth: nil}}) + :ok = :gen_tcp.controlling_process(client, pid) + _ -> + Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" + end + end end @doc""" |