defprotocol Shard.Manifest do
@moduledoc"""
A shard manifest is a data structure that uniquely defines the identity of the shard.
The hash of the manifest is the unique identifier of that shard on the network.
The Manifest protocol is a protocol implemented by the manifest structs for the
different shard types. It contains an operation module() that returns the main module
for the shard processes. The module must contain a function with the signature:
{:ok, pid} = <module>.start_link(manifest)
that will be called when the shard must be started.
"""
@doc"""
Get the module in question.
"""
def module(manifest)
end
defmodule Shard.Manager do
@moduledoc"""
Maintains several important tables :
- :shard_db (persistent with DETS)
List of
{ id, manifest, why_have_it, state }
why_have_it := {:pinned, %MapSet{who requires it...}, %MapSet{who it requires...}}
| {:req, %MapSet{who requires it...}, %MapSet{who it requires...}}
| {:cached, expiry_date}
- :peer_db (persistent with DETS)
Mult-list of
{ shard_id, peer_info } # TODO: add health info (last seen, ping, etc)
peer_info := {:inet, ip, port}
TODO peer_info |= {:inet6, ip, port} | {:onion, name}
- :shard_procs (not persistent)
List of
{ {id, path}, pid }
"""
use GenServer
require Logger
@cache_ttl 3600*24 # 24 hours
@clean_cache_every 60 # one minute
@shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
@peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def init(_) do
Process.flag(:trap_exit, true)
:dets.open_file(@shard_db, [type: :set])
:dets.open_file(@peer_db, [type: :bag])
:ets.new(:shard_procs, [:set, :protected, :named_table])
Process.send_after(self(), :clean_cache, 1000)
{:ok, %{}}
end
def handle_call({:find_or_start, shard_id, manifest}, _from, state) do
{pid, state} = find_or_start(state, shard_id, manifest)
{:reply, pid, state}
end
def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
:ets.insert(:shard_procs, { {shard_id, path}, pid })
state = Map.put(state, pid, {shard_id, path})
if path != nil do
Process.monitor(pid)
end
{:noreply, state}
end
def handle_cast({:peer_db_insert, shard_id, peer_info}, state) do
:dets.insert(@peer_db, {shard_id, peer_info})
{:noreply, state}
end
def handle_cast({:peer_db_delete, shard_id, peer_info}, state) do
:dets.match_delete(@peer_db, {shard_id, peer_info})
{:noreply, state}
end
def handle_cast({:save_state, shard_id, shst}, state) do
case :dets.lookup(@shard_db, shard_id) do
[{^shard_id, manifest, why_have_it, _old_state}] ->
:dets.insert(@shard_db, {shard_id, manifest, why_have_it, shst})
end
{:noreply, state}
end
def handle_cast({:pin, shard_id}, state) do
case :dets.lookup(@shard_db, shard_id) do
[{^shard_id, manifest, {:cached, _}, shst}] ->
:dets.insert(@shard_db, {shard_id, manifest, {:pinned, %MapSet{}, %MapSet{}}, shst})
{pid, state} = find_or_start(state, shard_id, manifest)
GenServer.cast(pid, :send_deps)
{:noreply, state}
[{^shard_id, manifest, {:req, a, b}, shst}] ->
:dets.insert(@shard_db, {shard_id, manifest, {:pinned, a, b}, shst})
{:noreply, state}
_ ->
{:noreply, state}
end
end
def handle_cast({:unpin, shard_id}, state) do
case :dets.lookup(@shard_db, shard_id) do
[{^shard_id, manifest, {:pinned, a, b}, shst}] ->
if MapSet.size(a) > 0 do
:dets.insert(@shard_db, {shard_id, manifest, {:req, a, b}, shst})
else
for dep <- b do
rm_dep_link(shard_id, dep)
end
:dets.insert(@shard_db, {shard_id, manifest, cached(), shst})
end
_ -> nil
end
{:noreply, state}
end
def handle_cast({:dep_list, shard_id, manifests}, state) do
case :dets.lookup(@shard_db, shard_id) do
[{^shard_id, manifest, {reason, a, b}, shst}] when reason == :pinned or reason == :req ->
bnew_pairs = Enum.map(manifests, fn m -> {SData.term_hash(m), m} end)
bnew_map = Enum.reduce(bnew_pairs, %{}, fn {id, m}, map -> Map.put(map, id, m) end)
bnew_set = Enum.reduce(bnew_pairs, %MapSet{}, fn {id, _m}, ms -> MapSet.put(ms, id) end)
state = MapSet.difference(bnew_set, b)
|> Enum.reduce(state, fn idadd, state ->
add_dep_link(state, shard_id, idadd, bnew_map[idadd])
end)
for idrm <- MapSet.difference(b, bnew_set) do
rm_dep_link(shard_id, idrm)
end
:dets.insert(@shard_db, {shard_id, manifest, {reason, a, bnew_set}, shst})
{:noreply, state}
_ ->
{:noreply, state}
end
end
def handle_info(:clean_cache, state) do
currtime = System.os_time :seconds
shards = :dets.select(@shard_db, [{
{:'$1', :_, {:cached, :'$2'}, :_}, [{:<, :'$2', currtime}], [:'$1']}
])
for [id] <- shards do
case :ets.lookup(:shard_procs, {id, nil}) do
[{{^id, nil}, pid}] ->
GenServer.cast(pid, :delete_shard)
_ -> nil
end
:dets.delete(@shard_db, id)
end
Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
{:noreply, state}
end
def handle_info({:DOWN, _, :process, pid, reason}, state) do
handle_info({:EXIT, pid, reason}, state)
end
def handle_info({:EXIT, pid, _reason}, state) do
case state[pid] do
nil -> {:noreply, state}
info ->
:ets.delete(:shard_procs, info)
state = Map.delete(state, pid)
{:noreply, state}
end
end
defp find_or_start(state, shard_id, manifest) do
case :dets.lookup(@shard_db, shard_id) do
[] ->
:dets.insert(@shard_db, {shard_id, manifest, cached(), nil})
[{^shard_id, ^manifest, {:cached, _}, shst}] ->
:dets.insert(@shard_db, {shard_id, manifest, cached(), shst})
_ -> nil
end
case :ets.lookup(:shard_procs, {shard_id, nil}) do
[] ->
{:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest])
:ets.insert(:shard_procs, {{shard_id, nil}, pid})
state = Map.put(state, pid, {shard_id, nil})
{pid, state}
[{{^shard_id, nil}, pid}] ->
{pid, state}
end
end
defp cached() do
{:cached, System.os_time(:seconds) + @cache_ttl}
end
defp add_dep_link(state, shard_id, id2, m2) do
case :dets.lookup(@shard_db, id2) do
[{^id2, ^m2, {reason, a, b}, shst}] when reason == :pinned or reason == :req ->
:dets.insert(@shard_db, {id2, m2, {reason, MapSet.put(a, shard_id), b}, shst})
state
_ ->
a = MapSet.new() |> MapSet.put(shard_id)
:dets.insert(@shard_db, {id2, m2, {:req, a, %MapSet{}}, nil})
{pid, state} = find_or_start(state, id2, m2)
GenServer.cast(pid, :send_deps)
state
end
end
defp rm_dep_link(shard_id, id2) do
case :dets.lookup(@shard_db, id2) do
[{^id2, m2, {reason, a, b}, shst}] when reason == :pinned or reason == :req ->
a2 = MapSet.delete(a, shard_id)
if reason == :req and MapSet.size(a2) == 0 do
:dets.insert(@shard_db, {id2, m2, cached(), shst})
for dep <- b do
rm_dep_link(id2, dep)
end
else
:dets.insert(@shard_db, {id2, m2, {reason, a2, b}, shst})
end
end
end
# ======================
# CALLED BY SNet.TcpConn
# ======================
@doc"""
Dispatch incoming message to correct shard process
"""
def incoming(conn_pid, peer_info, auth, {:interested, shards}) do
for shard_id <- shards do
case :dets.lookup(@shard_db, shard_id) do
[{ ^shard_id, manifest, _, _ }] ->
GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do
[] ->
GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
[{{^shard_id, nil}, pid}] -> pid
end
GenServer.cast(pid, {:interested, conn_pid, auth})
[] -> nil
end
end
end
def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do
GenServer.cast(__MODULE__, {:peer_db_delete, shard, peer_info})
end
def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do
case :dets.lookup(@shard_db, shard_id) do
[] ->
GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}})
[{ ^shard_id, manifest, _, _}] ->
GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
pid = case :ets.lookup(:shard_procs, {shard_id, path}) do
[] ->
GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
[{ {^shard_id, ^path}, pid }] -> pid
end
GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg})
end
end
# ================
# CALLED BY Sapp.*
# ================
@doc"""
Register a process as the main process for a shard.
Returns either :ok or :redundant, in which case the process must exit.
"""
def register(shard_id, manifest, pid) do
GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
end
@doc"""
Register a process as the handler for shard packets for a given path.
"""
def dispatch_to(shard_id, path, pid) do
GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
end
@doc"""
Return the list of all peer info for peers that are interested in a certain shard
"""
def get_shard_peers(shard_id) do
for {_, peer_info} <- :dets.lookup(@peer_db, shard_id), do: peer_info
end
@doc"""
Return the saved state value for a shard
"""
def load_state(shard_id) do
case :dets.lookup(@shard_db, shard_id) do
[{^shard_id, _, _, state}] -> state
_ -> nil
end
end
@doc"""
Save a state value for a shard
"""
def save_state(shard_id, state) do
GenServer.cast(__MODULE__, {:save_state, shard_id, state})
end
# ================
# CALLED BY ANYONE
# ================
@doc"""
Returns the pid for a shard if it exists
"""
def find_proc(shard_id) do
case :ets.lookup(:shard_procs, {shard_id, nil}) do
[{{^shard_id, _}, pid}] -> pid
_ -> nil
end
end
@doc"""
Returns the pid for a shard defined by its manifest.
Start it if it doesn't exist.
"""
def find_or_start(manifest) do
id = SData.term_hash manifest
case :ets.lookup(:shard_procs, {id, nil}) do
[{{^id, nil}, pid}] -> pid
[] ->
GenServer.call(__MODULE__, {:find_or_start, id, manifest})
end
end
@doc"""
Return the list of all shards. Returns a list of tuples:
{id, manifest, why_have_it}
"""
def list_shards() do
for [{id, m, why, _}] <- :dets.match(@shard_db, :"$1"), do: {id, m, why}
end
end