defprotocol Shard.Manifest do @doc "Start the corresponding Shard process" def start(manifest) end defmodule Shard.Manager do @moduledoc""" Maintains several important tables : - :peer_db List of { id, pid | nil, ip, port } - :shard_db List of { id, manifest, pid | nil } - :shard_procs List of { {id, path}, pid } - :shard_peer_db Mult-list of { shard_id, peer_id } And an internal table : - :outbox Multi-list of { peer_id, message, time_inserted } """ use GenServer require Logger @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom @shard_peer_db [Application.get_env(:shard, :data_path), "shard_peer_db"] |> Path.join |> String.to_atom def start_link(my_port) do GenServer.start_link(__MODULE__, my_port, name: __MODULE__) end def init(my_port) do :dets.open_file(@peer_db, [type: :set]) for [{id, _pid, ip, port}] <- :dets.match @peer_db, :"$1" do :dets.insert @peer_db, {id, nil, ip, port} # connect blindly to everyone add_peer(ip, port) end :dets.open_file(@shard_db, [type: :set]) for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do :dets.insert @shard_db, {id, manifest, nil} spawn fn -> Shard.Manifest.start manifest end end :dets.open_file(@shard_peer_db, [type: :bag]) :ets.new(:shard_procs, [:set, :protected, :named_table]) outbox = :ets.new(:outbox, [:bag, :private]) {:ok, %{my_port: my_port, outbox: outbox} } end def handle_call({:register, shard_id, manifest, pid}, _from, state) do will_live = case :dets.lookup(@shard_db, shard_id) do [{ ^shard_id, _, pid }] when pid != nil -> not Process.alive?(pid) _ -> true end reply = if will_live do Process.monitor(pid) :dets.insert(@shard_db, {shard_id, manifest, pid}) :ok else :redundant end {:reply, reply, state} end def handle_cast({:dispatch_to, shard_id, path, pid}, state) do :ets.insert(:shard_procs, { {shard_id, path}, pid }) Process.monitor(pid) {:noreply, state} end def handle_cast({:interested, peer_id, shards}, state) do for shard_id <- shards do case :dets.lookup(@shard_db, shard_id) do [{ ^shard_id, _, pid }] -> :dets.insert(@shard_peer_db, {shard_id, peer_id}) GenServer.cast(pid, {:interested, peer_id}) [] -> nil end end {:noreply, state} end def handle_cast({:not_interested, peer_id, shard_id}, state) do :dets.match_delete(@shard_peer_db, {shard_id, peer_id}) {:noreply, state} end def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do :dets.insert(@shard_peer_db, {shard_id, peer_id}) {:noreply, state} end def handle_cast({:peer_up, pk, pid, ip, port}, state) do for [pk2] <- :dets.match(@peer_db, {:'$1', :_, ip, port}) do if pk2 != pk do # obsolete peer information :dets.delete(@peer_db, pk2) :dets.match_delete(@shard_peer_db, {:_, pk2}) end end :dets.insert(@peer_db, {pk, pid, ip, port}) # Send interested message for all our shards id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id) GenServer.cast(pid, {:send_msg, {:interested, id_list}}) # Send queued messages for {_, msg, _} <- :ets.lookup(state.outbox, pk) do GenServer.cast(pid, {:send_msg, msg}) end :ets.delete(state.outbox, pk) {:noreply, state} end def handle_cast({:peer_down, pk, ip, port}, state) do :dets.insert(@peer_db, {pk, nil, ip, port}) {:noreply, state} end def handle_cast({:connect_and_send, peer_id, msg}, state) do case :dets.lookup(@peer_db, peer_id) do [{^peer_id, nil, ip, port}] -> add_peer(ip, port, state) currtime = System.os_time :second :ets.insert(state.outbox, {peer_id, msg, currtime}) outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ] :ets.select_delete(state.outbox, outbox_cleanup) _ -> Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database" end {:noreply, state} end def handle_cast({:try_connect, pk_list}, state) do for pk <- pk_list do case :dets.lookup(@peer_db, pk) do [{^pk, nil, ip, port}] -> add_peer(ip, port, state) _ -> nil end end {:noreply, state} end def handle_cast({:add_peer, ip, port}, state) do add_peer(ip, port, state) {:noreply, state} end def handle_info({:DOWN, _, :process, pid, _}, state) do :ets.match_delete(:shard_procs, {:_, pid}) {:noreply, state} end defp add_peer(ip, port, state) do spawn fn -> case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do {:ok, client} -> {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) :ok = :gen_tcp.controlling_process(client, pid) _ -> Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" end end end # ================ # PUBLIC INTERFACE # ================ @doc""" Connect to a peer specified by ip address and port """ def add_peer(ip, port) do GenServer.cast(__MODULE__, {:add_peer, ip, port}) end @doc""" Send message to a peer specified by peer id """ def send(peer_id, msg) do case :dets.lookup(@peer_db, peer_id) do [{ ^peer_id, pid, _, _}] when pid != nil-> GenServer.cast(pid, {:send_msg, msg}) _ -> GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) end end @doc""" Dispatch incoming message to correct shard process """ def dispatch(peer_id, {shard_id, path, msg}) do case :dets.lookup(@shard_db, shard_id) do [] -> __MODULE__.send(peer_id, {:not_interested, shard_id}) [_] -> case :dets.match(@shard_peer_db, {shard_id, peer_id}) do [] -> GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id}) _ -> nil end case :ets.lookup(:shard_procs, {shard_id, path}) do [{ {^shard_id, ^path}, pid }] -> GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg}) [] -> Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}") end end end def dispatch(peer_id, {:interested, shards}) do GenServer.cast(__MODULE__, {:interested, peer_id, shards}) end def dispatch(peer_id, {:not_interested, shard}) do GenServer.cast(__MODULE__, {:not_interested, peer_id, shard}) end @doc""" Register a process as the main process for a shard. Returns either :ok or :redundant, in which case the process must exit. """ def register(shard_id, manifest, pid) do GenServer.call(__MODULE__, {:register, shard_id, manifest, pid}) end @doc""" Register a process as the handler for shard packets for a given path. """ def dispatch_to(shard_id, path, pid) do GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) end def list_shards() do for [x] <- :dets.match(@shard_db, :"$1"), do: x end def list_peers() do for [x] <- :dets.match(@peer_db, :"$1"), do: x end def get_shard_peers(shard_id) do for [x] <- :dets.match(@shard_peer_db, {shard_id, :"$1"}), do: x end end