aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/manager.ex
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r--shard/lib/manager.ex59
1 files changed, 21 insertions, 38 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index 3f5416d..08e14c7 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -34,7 +34,7 @@ defmodule Shard.Manager do
Mult-list of
{ shard_id, peer_info } # TODO: add health info (last seen, ping, etc)
- peer_info := {:inet4, ip, port} | {:inet6, ip, port} | {:onion, name}
+ peer_info := {:inet, ip, port} | {:inet6, ip, port} | {:onion, name}
- :shard_procs (not persistent)
@@ -67,11 +67,11 @@ defmodule Shard.Manager do
@shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom
@peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
- def start_link(my_port) do
- GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
- def init(my_port) do
+ def init(_) do
:dets.open_file(@shard_db, [type: :set])
for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do
:dets.insert @shard_db, {id, manifest, nil}
@@ -85,7 +85,7 @@ defmodule Shard.Manager do
:ets.new(:connections, [:bag, :protected, :named_table])
outbox = :ets.new(:outbox, [:bag, :private])
- {:ok, %{my_port: my_port, outbox: outbox} }
+ {:ok, %{outbox: outbox} }
end
def handle_call({:register, shard_id, manifest, pid}, _from, state) do
@@ -153,9 +153,11 @@ defmodule Shard.Manager do
end
def handle_cast({:connect_and_send, peer_info, msg}, state) do
- case peer_info do
- {:tcp4, ip, port} ->
- add_peer(ip, port, state)
+ case :ets.lookup(:connections, peer_info) do
+ [{_, pid, _}|_] ->
+ GenServer.cast(pid, {:send_msg, msg})
+ [] ->
+ add_peer(peer_info)
currtime = System.os_time :second
:ets.insert(state.outbox, {peer_info, msg, currtime})
outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ]
@@ -164,39 +166,11 @@ defmodule Shard.Manager do
{:noreply, state}
end
- def handle_cast({:try_connect, pk_list}, state) do
- for pk <- pk_list do
- case :dets.lookup(@peer_db, pk) do
- [{^pk, nil, ip, port}] ->
- add_peer(ip, port, state)
- _ -> nil
- end
- end
- {:noreply, state}
- end
-
- def handle_cast({:add_peer, ip, port}, state) do
- add_peer(ip, port, state)
- {:noreply, state}
- end
-
def handle_info({:DOWN, _, :process, pid, _}, state) do
:ets.match_delete(:shard_procs, {:_, pid})
{:noreply, state}
end
- defp add_peer(ip, port, state) do
- spawn fn ->
- case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
- {:ok, client} ->
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port, is_client: true, auth: nil}})
- :ok = :gen_tcp.controlling_process(client, pid)
- _ ->
- Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
- end
- end
- end
-
# ======================
# CALLED BY SNet.TcpConn
@@ -305,8 +279,17 @@ defmodule Shard.Manager do
@doc"""
Connect to a peer specified by ip address and port
"""
- def add_peer(ip, port) do
- GenServer.cast(__MODULE__, {:add_peer, ip, port})
+ def add_peer({:inet, ip, port}) do
+ spawn fn ->
+ case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
+ {:ok, client} ->
+ my_port = Application.get_env(:shard, :port)
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port, is_client: true, auth: nil}})
+ :ok = :gen_tcp.controlling_process(client, pid)
+ _ ->
+ Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
+ end
+ end
end
@doc"""