defmodule Shard.Manager do @moduledoc""" Maintains two important tables : - :peer_db List of { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen } - :shard_db List of { id, manifest, shard_pid } And also some others : - :peer_shard_db Mult-list of { peer_id, shard_id } - :outbox Multi-list of { peer_id, message } """ use GenServer require Logger def start_link(my_port) do GenServer.start_link(__MODULE__, my_port, name: __MODULE__) end def init(my_port) do :ets.new(:peer_db, [:set, :protected, :named_table]) :ets.new(:shard_db, [:set, :protected, :named_table]) peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private]) outbox = :ets.new(:outbox, [:bag, :private]) {:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} } end def handle_call({:find, shard_id}, _from, state) do reply = case :ets.lookup(:shard_db, shard_id) do [{ ^shard_id, _, pid }] -> {:ok, pid} [] -> :not_found end {:reply, reply, state} end def handle_cast({:register, shard_id, manifest, pid}, state) do will_live = case :ets.lookup(:shard_db, shard_id) do [{ ^shard_id, _, pid }] -> not Process.alive?(pid) _ -> true end if will_live do :ets.insert(:shard_db, {shard_id, manifest, pid}) else GenServer.cast(pid, {:redundant, shard_id}) end {:noreply, state} end def handle_cast({:interested, peer_id, shards}, state) do for shard_id <- shards do case :ets.lookup(:shard_db, shard_id) do [{ ^shard_id, _, pid }] -> :ets.insert(state.peer_shard_db, {peer_id, shard_id}) GenServer.cast(pid, {:interested, peer_id}) [] -> nil end end {:noreply, state} end def handle_cast({:peer_up, pk, pid, ip, port}, state) do :ets.insert(:peer_db, {pk, pid, ip, port}) for {_, msg, _} <- :ets.lookup(state.outbox, pk) do GenServer.cast(pid, {:send_msg, msg}) end :ets.delete(state.outbox, pk) {:noreply, state} end def handle_cast({:peer_down, pk, ip, port}, state) do :ets.insert(:peer_db, {pk, nil, ip, port}) {:noreply, state} end def handle_cast({:connect_and_send, peer_id, msg}, state) do case :ets.lookup(:peer_db, peer_id) do [{^peer_id, nil, ip, port}] -> add_peer(ip, port, state) currtime = System.os_time :second :ets.insert(state.outbox, {peer_id, msg, currtime}) outbox_cleanup = [{{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [:'$1']}] :ets.select_delete(state.outbox, outbox_cleanup) _ -> nil end {:noreply, state} end def handle_cast({:try_connect, pk_list}, state) do for pk <- pk_list do case :ets.lookup(:peer_db, pk) do [{^pk, nil, ip, port}] -> add_peer(ip, port, state) _ -> nil end end {:noreply, state} end def handle_cast({:add_peer, ip, port}, state) do add_peer(ip, port, state) {:noreply, state} end defp add_peer(ip, port, state) do spawn fn -> case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do {:ok, client} -> {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) :ok = :gen_tcp.controlling_process(client, pid) _ -> Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" end end end def add_peer(ip, port) do GenServer.cast(__MODULE__, {:add_peer, ip, port}) end def send(peer_id, msg) do case :ets.lookup(:peer_db, peer_id) do [{ ^peer_id, pid, _, _}] when pid != nil-> GenServer.cast(pid, {:send_msg, msg}) _ -> GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) end end def dispatch(peer_id, shard_id, msg) do case :ets.lookup(:shard_db, shard_id) do [{ ^shard_id, _, pid }] when pid != nil -> GenServer.cast(pid, {:msg, peer_id, msg}) [_] -> nil # TODO restart shard [] -> nil # TODO send not interested end end end