defmodule Shard.Manager do
@moduledoc"""
Maintains two important tables :
- :peer_db
List of
{ id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }
- :shard_db
List of
{ id, manifest, shard_pid }
And also some others :
- :peer_shard_db
Mult-list of
{ peer_id, shard_id }
- :outbox
Multi-list of
{ peer_id, message }
"""
use GenServer
require Logger
def start_link(my_port) do
GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
end
def init(my_port) do
:ets.new(:peer_db, [:set, :protected, :named_table])
:ets.new(:shard_db, [:set, :protected, :named_table])
peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private])
outbox = :ets.new(:outbox, [:bag, :private])
{:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} }
end
def handle_call({:find, shard_id}, _from, state) do
reply = case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] -> {:ok, pid}
[] -> :not_found
end
{:reply, reply, state}
end
def handle_cast({:register, shard_id, manifest, pid}, state) do
will_live = case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] -> not Process.alive?(pid)
_ -> true
end
if will_live do
:ets.insert(:shard_db, {shard_id, manifest, pid})
else
GenServer.cast(pid, {:redundant, shard_id})
end
{:noreply, state}
end
def handle_cast({:interested, peer_id, shards}, state) do
for shard_id <- shards do
case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] ->
:ets.insert(state.peer_shard_db, {peer_id, shard_id})
GenServer.cast(pid, {:interested, peer_id})
[] -> nil
end
end
{:noreply, state}
end
def handle_cast({:peer_up, pk, pid, ip, port}, state) do
:ets.insert(:peer_db, {pk, pid, ip, port})
for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
GenServer.cast(pid, {:send_msg, msg})
end
:ets.delete(state.outbox, pk)
{:noreply, state}
end
def handle_cast({:peer_down, pk, ip, port}, state) do
:ets.insert(:peer_db, {pk, nil, ip, port})
{:noreply, state}
end
def handle_cast({:connect_and_send, peer_id, msg}, state) do
case :ets.lookup(:peer_db, peer_id) do
[{^peer_id, nil, ip, port}] ->
add_peer(ip, port, state)
currtime = System.os_time :second
:ets.insert(state.outbox, {peer_id, msg, currtime})
outbox_cleanup = [{{:_, :_, :'$1'},
[{:<, :'$1', currtime - 60}],
[:'$1']}]
:ets.select_delete(state.outbox, outbox_cleanup)
_ -> nil
end
{:noreply, state}
end
def handle_cast({:try_connect, pk_list}, state) do
for pk <- pk_list do
case :ets.lookup(:peer_db, pk) do
[{^pk, nil, ip, port}] ->
add_peer(ip, port, state)
_ -> nil
end
end
{:noreply, state}
end
def handle_cast({:add_peer, ip, port}, state) do
add_peer(ip, port, state)
{:noreply, state}
end
defp add_peer(ip, port, state) do
spawn fn ->
case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
{:ok, client} ->
{:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
:ok = :gen_tcp.controlling_process(client, pid)
_ ->
Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
end
end
end
def add_peer(ip, port) do
GenServer.cast(__MODULE__, {:add_peer, ip, port})
end
def send(peer_id, msg) do
case :ets.lookup(:peer_db, peer_id) do
[{ ^peer_id, pid, _, _}] when pid != nil->
GenServer.cast(pid, {:send_msg, msg})
_ ->
GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
end
end
def dispatch(peer_id, shard_id, msg) do
case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] when pid != nil ->
GenServer.cast(pid, {:msg, peer_id, msg})
[_] -> nil # TODO restart shard
[] -> nil # TODO send not interested
end
end
end