defprotocol Shard.Manifest do
@doc "Start the corresponding Shard process"
def start(manifest)
end
defmodule Shard.Manager do
@moduledoc"""
Maintains several important tables :
- :peer_db
List of
{ id, pid | nil, ip, port }
- :shard_db
List of
{ id, manifest, pid | nil }
- :shard_procs
List of
{ {id, path}, pid }
- :shard_peer_db
Mult-list of
{ shard_id, peer_id }
And an internal table :
- :outbox
Multi-list of
{ peer_id, message, time_inserted }
"""
use GenServer
require Logger
@peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
@shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
@shard_peer_db [Application.get_env(:shard, :data_path), "shard_peer_db"] |> Path.join |> String.to_atom
def start_link(my_port) do
GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
end
def init(my_port) do
:dets.open_file(@peer_db, [type: :set])
for [{id, _pid, ip, port}] <- :dets.match @peer_db, :"$1" do
:dets.insert @peer_db, {id, nil, ip, port}
# connect blindly to everyone
add_peer(ip, port)
end
:dets.open_file(@shard_db, [type: :set])
for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do
:dets.insert @shard_db, {id, manifest, nil}
spawn fn -> Shard.Manifest.start manifest end
end
:dets.open_file(@shard_peer_db, [type: :bag])
:ets.new(:shard_procs, [:set, :protected, :named_table])
outbox = :ets.new(:outbox, [:bag, :private])
{:ok, %{my_port: my_port, outbox: outbox} }
end
def handle_call({:register, shard_id, manifest, pid}, _from, state) do
will_live = case :dets.lookup(@shard_db, shard_id) do
[{ ^shard_id, _, pid }] when pid != nil -> not Process.alive?(pid)
_ -> true
end
reply = if will_live do
Process.monitor(pid)
:dets.insert(@shard_db, {shard_id, manifest, pid})
:ok
else
:redundant
end
{:reply, reply, state}
end
def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
:ets.insert(:shard_procs, { {shard_id, path}, pid })
Process.monitor(pid)
{:noreply, state}
end
def handle_cast({:interested, peer_id, shards}, state) do
for shard_id <- shards do
case :dets.lookup(@shard_db, shard_id) do
[{ ^shard_id, _, pid }] ->
:dets.insert(@shard_peer_db, {shard_id, peer_id})
GenServer.cast(pid, {:interested, peer_id})
[] -> nil
end
end
{:noreply, state}
end
def handle_cast({:not_interested, peer_id, shard_id}, state) do
:dets.match_delete(@shard_peer_db, {shard_id, peer_id})
{:noreply, state}
end
def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do
:dets.insert(@shard_peer_db, {shard_id, peer_id})
{:noreply, state}
end
def handle_cast({:peer_up, pk, pid, ip, port}, state) do
for [pk2] <- :dets.match(@peer_db, {:'$1', :_, ip, port}) do
if pk2 != pk do
# obsolete peer information
:dets.delete(@peer_db, pk2)
:dets.match_delete(@shard_peer_db, {:_, pk2})
end
end
:dets.insert(@peer_db, {pk, pid, ip, port})
# Send interested message for all our shards
id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id)
GenServer.cast(pid, {:send_msg, {:interested, id_list}})
# Send queued messages
for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
GenServer.cast(pid, {:send_msg, msg})
end
:ets.delete(state.outbox, pk)
{:noreply, state}
end
def handle_cast({:peer_down, pk, ip, port}, state) do
:dets.insert(@peer_db, {pk, nil, ip, port})
{:noreply, state}
end
def handle_cast({:connect_and_send, peer_id, msg}, state) do
case :dets.lookup(@peer_db, peer_id) do
[{^peer_id, nil, ip, port}] ->
add_peer(ip, port, state)
currtime = System.os_time :second
:ets.insert(state.outbox, {peer_id, msg, currtime})
outbox_cleanup = [{{:_, :_, :'$1'},
[{:<, :'$1', currtime - 60}],
[:'$1']}]
:ets.select_delete(state.outbox, outbox_cleanup)
_ ->
Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database"
end
{:noreply, state}
end
def handle_cast({:try_connect, pk_list}, state) do
for pk <- pk_list do
case :dets.lookup(@peer_db, pk) do
[{^pk, nil, ip, port}] ->
add_peer(ip, port, state)
_ -> nil
end
end
{:noreply, state}
end
def handle_cast({:add_peer, ip, port}, state) do
add_peer(ip, port, state)
{:noreply, state}
end
def handle_info({:DOWN, _, :process, pid, _}, state) do
:ets.match_delete(:shard_procs, {:_, pid})
{:noreply, state}
end
defp add_peer(ip, port, state) do
spawn fn ->
case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
{:ok, client} ->
{:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
:ok = :gen_tcp.controlling_process(client, pid)
_ ->
Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
end
end
end
# ================
# PUBLIC INTERFACE
# ================
@doc"""
Connect to a peer specified by ip address and port
"""
def add_peer(ip, port) do
GenServer.cast(__MODULE__, {:add_peer, ip, port})
end
@doc"""
Send message to a peer specified by peer id
"""
def send(peer_id, msg) do
case :dets.lookup(@peer_db, peer_id) do
[{ ^peer_id, pid, _, _}] when pid != nil->
GenServer.cast(pid, {:send_msg, msg})
_ ->
GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
end
end
@doc"""
Dispatch incoming message to correct shard process
"""
def dispatch(peer_id, {shard_id, path, msg}) do
case :dets.lookup(@shard_db, shard_id) do
[] ->
__MODULE__.send(peer_id, {:not_interested, shard_id})
[_] ->
case :dets.match(@shard_peer_db, {shard_id, peer_id}) do
[] ->
GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id})
_ -> nil
end
case :ets.lookup(:shard_procs, {shard_id, path}) do
[{ {^shard_id, ^path}, pid }] ->
GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg})
[] ->
Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}")
end
end
end
def dispatch(peer_id, {:interested, shards}) do
GenServer.cast(__MODULE__, {:interested, peer_id, shards})
end
def dispatch(peer_id, {:not_interested, shard}) do
GenServer.cast(__MODULE__, {:not_interested, peer_id, shard})
end
@doc"""
Register a process as the main process for a shard.
Returns either :ok or :redundant, in which case the process must exit.
"""
def register(shard_id, manifest, pid) do
GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
end
@doc"""
Register a process as the handler for shard packets for a given path.
"""
def dispatch_to(shard_id, path, pid) do
GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
end
def list_shards() do
for [x] <- :dets.match(@shard_db, :"$1"), do: x
end
def list_peers() do
for [x] <- :dets.match(@peer_db, :"$1"), do: x
end
def get_shard_peers(shard_id) do
for [x] <- :dets.match(@shard_peer_db, {shard_id, :"$1"}), do: x
end
end