defprotocol Shard.Manifest do @moduledoc""" A shard manifest is a data structure that uniquely defines the identity of the shard. The hash of the manifest is the unique identifier of that shard on the network. The Manifest protocol is a protocol implemented by the manifest structs for the different shard types. It contains an operation module() that returns the main module for the shard processes. The module must contain a function with the signature: {:ok, pid} = .start_link(manifest) that will be called when the shard must be started. """ @doc""" Get the module that implements the shard. All shard modules must have a function `start_link` that start the shard process and take a single argument: the manifest. """ def module(manifest) @doc""" Check if manifest is valid """ def is_valid?(manifest) end defmodule Shard.Manager do @moduledoc""" The manager is the main process by which shards are started, stopped, and their lifetime monitored. Maintains several important tables : - `@shard_db` (persistent with DETS), a list of: ``` { id, manifest, why_have_it, state } why_have_it := {:pinned, %MapSet{who requires it...}, %MapSet{who it requires...}} | {:req, %MapSet{who requires it...}, %MapSet{who it requires...}} | {:cached, expiry_date} ``` - `@peer_db` (persistent with DETS), a multi-list of: ``` { shard_id, peer_info } # TODO: add health info (last seen, ping, etc) peer_info := {:inet, ip, port} TODO peer_info |= {:inet6, ip, port} | {:onion, name} ``` - `:shard_procs` (not persistent), a list of: ``` { {id, path}, pid } ``` The path value is used to distinguish between a shard's main process (`path == nil`) and companion sub-processes such as a page store used by the shard. """ use GenServer require Logger @cache_ttl 3600*24 # 24 hours @clean_cache_every 60 # one minute @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom def start_link(_) do GenServer.start_link(__MODULE__, nil, name: __MODULE__) end def init(_) do Process.flag(:trap_exit, true) :dets.open_file(@shard_db, [type: :set]) :dets.open_file(@peer_db, [type: :bag]) :ets.new(:shard_procs, [:set, :protected, :named_table]) Process.send_after(self(), :clean_cache, 1000) {:ok, %{}} end def handle_call({:find_or_start, shard_id, manifest}, _from, state) do {pid, state} = find_or_start(state, shard_id, manifest) {:reply, pid, state} end def handle_call({:delete, shard_id}, _from, state) do case :dets.lookup(@shard_db, shard_id) do [] -> {:reply, {:error, :not_found}, state} [{^shard_id, manifest, {:cached, _}, _}] -> pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do [] -> {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest]) pid [{{^shard_id, nil}, pid}] -> :ets.delete(:shard_procs, {shard_id, nil}) pid end GenServer.call(pid, :delete_shard) :dets.delete(@shard_db, shard_id) {:reply, :ok, state} [{^shard_id, _, _, _}] -> {:reply, {:error, :pinned}, state} end end def handle_cast({:dispatch_to, shard_id, path, pid}, state) do :ets.insert(:shard_procs, { {shard_id, path}, pid }) state = Map.put(state, pid, {shard_id, path}) if path != nil do Process.monitor(pid) end {:noreply, state} end def handle_cast({:peer_db_insert, shard_id, peer_info}, state) do :dets.insert(@peer_db, {shard_id, peer_info}) {:noreply, state} end def handle_cast({:peer_db_delete, shard_id, peer_info}, state) do :dets.match_delete(@peer_db, {shard_id, peer_info}) {:noreply, state} end def handle_cast({:save_state, shard_id, shst}, state) do case :dets.lookup(@shard_db, shard_id) do [{^shard_id, manifest, why_have_it, _old_state}] -> :dets.insert(@shard_db, {shard_id, manifest, why_have_it, shst}) :dets.sync(@shard_db) end {:noreply, state} end def handle_cast({:pin, shard_id}, state) do case :dets.lookup(@shard_db, shard_id) do [{^shard_id, manifest, {:cached, _}, shst}] -> :dets.insert(@shard_db, {shard_id, manifest, {:pinned, %MapSet{}, %MapSet{}}, shst}) {pid, state} = find_or_start(state, shard_id, manifest) GenServer.cast(pid, :send_deps) {:noreply, state} [{^shard_id, manifest, {:req, a, b}, shst}] -> :dets.insert(@shard_db, {shard_id, manifest, {:pinned, a, b}, shst}) {:noreply, state} _ -> {:noreply, state} end end def handle_cast({:unpin, shard_id}, state) do case :dets.lookup(@shard_db, shard_id) do [{^shard_id, manifest, {:pinned, a, b}, shst}] -> if MapSet.size(a) > 0 do :dets.insert(@shard_db, {shard_id, manifest, {:req, a, b}, shst}) else for dep <- b do rm_dep_link(shard_id, dep) end :dets.insert(@shard_db, {shard_id, manifest, cached(), shst}) end _ -> nil end {:noreply, state} end def handle_cast({:dep_list, shard_id, manifests}, state) do case :dets.lookup(@shard_db, shard_id) do [{^shard_id, manifest, {reason, a, b}, shst}] when reason == :pinned or reason == :req -> bnew_pairs = Enum.map(manifests, fn m -> {SData.term_hash(m), m} end) bnew_map = Enum.reduce(bnew_pairs, %{}, fn {id, m}, map -> Map.put(map, id, m) end) bnew_set = Enum.reduce(bnew_pairs, %MapSet{}, fn {id, _m}, ms -> MapSet.put(ms, id) end) state = MapSet.difference(bnew_set, b) |> Enum.reduce(state, fn idadd, state -> add_dep_link(state, shard_id, idadd, bnew_map[idadd]) end) for idrm <- MapSet.difference(b, bnew_set) do rm_dep_link(shard_id, idrm) end :dets.insert(@shard_db, {shard_id, manifest, {reason, a, bnew_set}, shst}) {:noreply, state} _ -> {:noreply, state} end end def handle_info(:clean_cache, state) do currtime = System.os_time :seconds shards = :dets.select(@shard_db, [{ {:'$1', :_, {:cached, :'$2'}, :_}, [{:<, :'$2', currtime}], [:'$1']} ]) for [id] <- shards do case :ets.lookup(:shard_procs, {id, nil}) do [{{^id, nil}, pid}] -> GenServer.call(pid, :delete_shard) _ -> nil end :dets.delete(@shard_db, id) end Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) {:noreply, state} end def handle_info({:DOWN, _, :process, pid, reason}, state) do handle_info({:EXIT, pid, reason}, state) end def handle_info({:EXIT, pid, _reason}, state) do case state[pid] do nil -> {:noreply, state} info -> :ets.delete(:shard_procs, info) state = Map.delete(state, pid) {:noreply, state} end end defp find_or_start(state, shard_id, manifest) do true = Shard.Manifest.is_valid?(manifest) case :dets.lookup(@shard_db, shard_id) do [] -> :dets.insert(@shard_db, {shard_id, manifest, cached(), nil}) [{^shard_id, ^manifest, {:cached, _}, shst}] -> :dets.insert(@shard_db, {shard_id, manifest, cached(), shst}) _ -> nil end case :ets.lookup(:shard_procs, {shard_id, nil}) do [] -> {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest]) GenServer.cast(pid, :send_deps) :ets.insert(:shard_procs, {{shard_id, nil}, pid}) state = Map.put(state, pid, {shard_id, nil}) {pid, state} [{{^shard_id, nil}, pid}] -> {pid, state} end end defp cached() do {:cached, System.os_time(:seconds) + @cache_ttl} end defp add_dep_link(state, shard_id, id2, m2) do case :dets.lookup(@shard_db, id2) do [{^id2, ^m2, {reason, a, b}, shst}] when reason == :pinned or reason == :req -> :dets.insert(@shard_db, {id2, m2, {reason, MapSet.put(a, shard_id), b}, shst}) state _ -> a = MapSet.new() |> MapSet.put(shard_id) :dets.insert(@shard_db, {id2, m2, {:req, a, %MapSet{}}, nil}) {pid, state} = find_or_start(state, id2, m2) GenServer.cast(pid, :send_deps) state end end defp rm_dep_link(shard_id, id2) do case :dets.lookup(@shard_db, id2) do [{^id2, m2, {reason, a, b}, shst}] when reason == :pinned or reason == :req -> a2 = MapSet.delete(a, shard_id) if reason == :req and MapSet.size(a2) == 0 do :dets.insert(@shard_db, {id2, m2, cached(), shst}) for dep <- b do rm_dep_link(id2, dep) end else :dets.insert(@shard_db, {id2, m2, {reason, a2, b}, shst}) end end end # ====================== # CALLED BY SNet.TcpConn # ====================== @doc""" Dispatch incoming message to correct shard process """ def incoming(conn_pid, peer_info, auth, {:interested, shards}) do for shard_id <- shards do case :dets.lookup(@shard_db, shard_id) do [{ ^shard_id, manifest, _, _ }] -> GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info}) pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do [] -> GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest}) [{{^shard_id, nil}, pid}] -> pid end GenServer.cast(pid, {:interested, conn_pid, auth}) [] -> nil end end end def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do GenServer.cast(__MODULE__, {:peer_db_delete, shard, peer_info}) end def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do case :dets.lookup(@shard_db, shard_id) do [] -> GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}}) [{ ^shard_id, manifest, _, _}] -> GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info}) pid = case :ets.lookup(:shard_procs, {shard_id, path}) do [] -> GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest}) [{ {^shard_id, ^path}, pid }] -> pid end GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg}) end end # ================ # CALLED BY Sapp.* # ================ @doc""" Register a process as the main process for a shard. Returns either :ok or :redundant, in which case the process must exit. """ def register(shard_id, manifest, pid) do GenServer.call(__MODULE__, {:register, shard_id, manifest, pid}) end @doc""" Register a process as the handler for shard packets for a given path. """ def dispatch_to(shard_id, path, pid) do GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) end @doc""" Return the list of all peer info for peers that are interested in a certain shard """ def get_shard_peers(shard_id) do for {_, peer_info} <- :dets.lookup(@peer_db, shard_id), do: peer_info end @doc""" Return the saved state value for a shard """ def load_state(shard_id) do case :dets.lookup(@shard_db, shard_id) do [{^shard_id, _, _, state}] -> state _ -> nil end end @doc""" Save a state value for a shard """ def save_state(shard_id, state) do GenServer.cast(__MODULE__, {:save_state, shard_id, state}) end # ================ # CALLED BY ANYONE # ================ @doc""" Returns the pid for a shard if it exists """ def find_proc(shard_id) do case :ets.lookup(:shard_procs, {shard_id, nil}) do [{{^shard_id, _}, pid}] -> pid _ -> nil end end @doc""" Returns the pid for a shard defined by its manifest. Start it if it doesn't exist. """ def find_or_start(manifest) do id = SData.term_hash manifest case :ets.lookup(:shard_procs, {id, nil}) do [{{^id, nil}, pid}] -> pid [] -> GenServer.call(__MODULE__, {:find_or_start, id, manifest}) end end @doc""" Delete a shard """ def delete(shard_id) do GenServer.call(__MODULE__, {:delete, shard_id}) end @doc""" Return the list of all shards. Returns a list of tuples: {id, manifest, why_have_it} """ def list_shards() do for [{id, m, why, _}] <- :dets.match(@shard_db, :"$1"), do: {id, m, why} end @doc""" Check if we are storing this shard """ def have_shard?(shard_id) do :dets.lookup(@shard_db, shard_id) != [] end end