defprotocol Shard.Manifest do
@moduledoc"""
A shard manifest is a data structure that uniquely defines the identity of the shard.
The hash of the manifest is the unique identifier of that shard on the network.
The Manifest protocol is a protocol implemented by the manifest structs for the
different shard types. It contains an operation module() that returns the main module
for the shard processes. The module must contain a function with the signature:
{:ok, pid} = <module>.start_link(manifest)
that will be called when the shard must be started.
"""
@doc"""
Get the module in question.
"""
def module(manifest)
end
defmodule Shard.Manager do
@moduledoc"""
Maintains several important tables :
- :shard_db (persistent with DETS)
List of
{ id, manifest, state }
- :peer_db (persistent with DETS)
Mult-list of
{ shard_id, peer_info } # TODO: add health info (last seen, ping, etc)
peer_info := {:inet, ip, port}
TODO peer_info |= {:inet6, ip, port} | {:onion, name}
- :shard_procs (not persistent)
List of
{ {id, path}, pid }
"""
use GenServer
require Logger
@shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
@peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def init(_) do
Process.flag(:trap_exit, true)
:dets.open_file(@shard_db, [type: :set])
:dets.open_file(@peer_db, [type: :bag])
:ets.new(:shard_procs, [:set, :protected, :named_table])
{:ok, %{}}
end
def handle_call({:find_or_start, shard_id, manifest}, _from, state) do
case :dets.lookup(@shard_db, shard_id) do
[] -> :dets.insert(@shard_db, {shard_id, manifest, nil})
_ -> nil
end
case :ets.lookup(:shard_procs, {shard_id, nil}) do
[] ->
{:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest])
:ets.insert(:shard_procs, {{shard_id, nil}, pid})
state = Map.put(state, pid, {shard_id, nil})
{:reply, pid, state}
pid ->
{:reply, pid, state}
end
end
def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
:ets.insert(:shard_procs, { {shard_id, path}, pid })
state = Map.put(state, pid, {shard_id, path})
if path != nil do
Process.monitor(pid)
end
{:noreply, state}
end
def handle_cast({:peer_db_insert, shard_id, peer_info}, state) do
:dets.insert(@peer_db, {shard_id, peer_info})
{:noreply, state}
end
def handle_cast({:peer_db_delete, shard_id, peer_info}, state) do
:dets.match_delete(@peer_db, {shard_id, peer_info})
{:noreply, state}
end
def handle_info({:DOWN, _, :process, pid, reason}, state) do
handle_info({:EXIT, pid, reason}, state)
end
def handle_info({:EXIT, pid, _reason}, state) do
case state[pid] do
nil -> {:noreply, state}
info ->
:ets.delete(:shard_procs, info)
state = Map.delete(state, pid)
{:noreply, state}
end
end
# ======================
# CALLED BY SNet.TcpConn
# ======================
@doc"""
Dispatch incoming message to correct shard process
"""
def incoming(conn_pid, peer_info, auth, {:interested, shards}) do
for shard_id <- shards do
case :dets.lookup(@shard_db, shard_id) do
[{ ^shard_id, manifest, _ }] ->
GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do
[] ->
GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
[{{^shard_id, nil}, pid}] -> pid
end
GenServer.cast(pid, {:interested, conn_pid, auth})
[] -> nil
end
end
end
def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do
GenServer.cast(__MODULE__, {:peer_db_delete, shard, peer_info})
end
def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do
case :dets.lookup(@shard_db, shard_id) do
[] ->
GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}})
[{ ^shard_id, manifest, _}] ->
GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
pid = case :ets.lookup(:shard_procs, {shard_id, path}) do
[] ->
GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
[{ {^shard_id, ^path}, pid }] -> pid
end
GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg})
end
end
# ================
# CALLED BY Sapp.*
# ================
@doc"""
Register a process as the main process for a shard.
Returns either :ok or :redundant, in which case the process must exit.
"""
def register(shard_id, manifest, pid) do
GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
end
@doc"""
Register a process as the handler for shard packets for a given path.
"""
def dispatch_to(shard_id, path, pid) do
GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
end
@doc"""
Return the list of all peer info for peers that are interested in a certain shard
"""
def get_shard_peers(shard_id) do
for {_, peer_info} <- :dets.lookup(@peer_db, shard_id), do: peer_info
end
@doc"""
Return the saved state value for a shard
"""
def load_state(shard_id) do
case :dets.lookup(@shard_db, shard_id) do
[{^shard_id, _, state}] -> state
_ -> nil
end
end
@doc"""
Save a state value for a shard
"""
def save_state(shard_id, state) do
case :dets.lookup(@shard_db, shard_id) do
[{^shard_id, manifest, _old_state}] ->
:dets.insert(@shard_db, {shard_id, manifest, state})
end
end
# ================
# CALLED BY ANYONE
# ================
@doc"""
Returns the pid for a shard if it exists
"""
def find_proc(shard_id) do
case :ets.lookup(:shard_procs, {shard_id, nil}) do
[{{^shard_id, _}, pid}] -> pid
_ -> nil
end
end
@doc"""
Returns the pid for a shard defined by its manifest.
Start it if it doesn't exist.
"""
def find_or_start(manifest) do
id = SData.term_hash manifest
case :ets.lookup(:shard_procs, {id, nil}) do
[{{^id, nil}, pid}] -> pid
[] ->
GenServer.call(__MODULE__, {:find_or_start, id, manifest})
end
end
@doc"""
Return the list of all shards.
"""
def list_shards() do
for [x] <- :dets.match(@shard_db, :"$1"), do: x
end
end