diff options
Diffstat (limited to 'lib/manager.ex')
-rw-r--r-- | lib/manager.ex | 243 |
1 files changed, 0 insertions, 243 deletions
diff --git a/lib/manager.ex b/lib/manager.ex deleted file mode 100644 index 82984d6..0000000 --- a/lib/manager.ex +++ /dev/null @@ -1,243 +0,0 @@ -defmodule Shard.Manager do - @moduledoc""" - Maintains several important tables : - - - :peer_db - - List of - { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen } - - - :shard_db - - List of - { id, manifest, pid | nil } - - - :shard_procs - - List of - { {id, path}, pid } - - - :shard_peer_db - - Mult-list of - { shard_id, peer_id } - - - And an internal table : - - - :outbox - - Multi-list of - { peer_id, message } - - """ - - use GenServer - - require Logger - - def start_link(my_port) do - GenServer.start_link(__MODULE__, my_port, name: __MODULE__) - end - - def init(my_port) do - :ets.new(:peer_db, [:set, :protected, :named_table]) - :ets.new(:shard_db, [:set, :protected, :named_table]) - :ets.new(:shard_procs, [:set, :protected, :named_table]) - :ets.new(:shard_peer_db, [:bag, :protected, :named_table]) - outbox = :ets.new(:outbox, [:bag, :private]) - - {:ok, %{my_port: my_port, outbox: outbox} } - end - - def handle_call({:register, shard_id, manifest, pid}, _from, state) do - will_live = case :ets.lookup(:shard_db, shard_id) do - [{ ^shard_id, _, pid }] -> not Process.alive?(pid) - _ -> true - end - reply = if will_live do - Process.monitor(pid) - :ets.insert(:shard_db, {shard_id, manifest, pid}) - :ok - else - :redundant - end - {:reply, reply, state} - end - - def handle_cast({:dispatch_to, shard_id, path, pid}, state) do - :ets.insert(:shard_procs, { {shard_id, path}, pid }) - Process.monitor(pid) - {:noreply, state} - end - - def handle_cast({:interested, peer_id, shards}, state) do - for shard_id <- shards do - case :ets.lookup(:shard_db, shard_id) do - [{ ^shard_id, _, pid }] -> - :ets.insert(:shard_peer_db, {shard_id, peer_id}) - GenServer.cast(pid, {:interested, peer_id}) - [] -> nil - end - end - {:noreply, state} - end - - def handle_cast({:not_interested, peer_id, shard_id}, state) do - :ets.match_delete(:shard_peer_db, {shard_id, peer_id}) - {:noreply, state} - end - - def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do - :ets.insert(:shard_peer_db, {shard_id, peer_id}) - {:noreply, state} - end - - def handle_cast({:peer_up, pk, pid, ip, port}, state) do - for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do - if pk2 != pk do - # obsolete peer information - :ets.delete(:peer_db, pk2) - :ets.match_delete(:shard_peer_db, {:_, pk2}) - end - end - :ets.insert(:peer_db, {pk, pid, ip, port}) - - # Send interested message for all our shards - id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id) - GenServer.cast(pid, {:send_msg, {:interested, id_list}}) - - # Send queued messages - for {_, msg, _} <- :ets.lookup(state.outbox, pk) do - GenServer.cast(pid, {:send_msg, msg}) - end - :ets.delete(state.outbox, pk) - - {:noreply, state} - end - - def handle_cast({:peer_down, pk, ip, port}, state) do - :ets.insert(:peer_db, {pk, nil, ip, port}) - {:noreply, state} - end - - def handle_cast({:connect_and_send, peer_id, msg}, state) do - case :ets.lookup(:peer_db, peer_id) do - [{^peer_id, nil, ip, port}] -> - add_peer(ip, port, state) - currtime = System.os_time :second - :ets.insert(state.outbox, {peer_id, msg, currtime}) - outbox_cleanup = [{{:_, :_, :'$1'}, - [{:<, :'$1', currtime - 60}], - [:'$1']}] - :ets.select_delete(state.outbox, outbox_cleanup) - _ -> - Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database" - end - {:noreply, state} - end - - def handle_cast({:try_connect, pk_list}, state) do - for pk <- pk_list do - case :ets.lookup(:peer_db, pk) do - [{^pk, nil, ip, port}] -> - add_peer(ip, port, state) - _ -> nil - end - end - {:noreply, state} - end - - def handle_cast({:add_peer, ip, port}, state) do - add_peer(ip, port, state) - {:noreply, state} - end - - def handle_info({:DOWN, _, :process, pid, _}, state) do - :ets.match_delete(:shard_procs, {:_, pid}) - {:noreply, state} - end - - defp add_peer(ip, port, state) do - spawn fn -> - case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do - {:ok, client} -> - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) - :ok = :gen_tcp.controlling_process(client, pid) - _ -> - Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" - end - end - end - - - # ================ - # PUBLIC INTERFACE - # ================ - - - @doc""" - Connect to a peer specified by ip address and port - """ - def add_peer(ip, port) do - GenServer.cast(__MODULE__, {:add_peer, ip, port}) - end - - @doc""" - Send message to a peer specified by peer id - """ - def send(peer_id, msg) do - case :ets.lookup(:peer_db, peer_id) do - [{ ^peer_id, pid, _, _}] when pid != nil-> - GenServer.cast(pid, {:send_msg, msg}) - _ -> - GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) - end - end - - @doc""" - Dispatch incoming message to correct shard process - """ - def dispatch(peer_id, {shard_id, path, msg}) do - case :ets.lookup(:shard_db, shard_id) do - [] -> - __MODULE__.send(peer_id, {:not_interested, shard_id}) - [_] -> - case :ets.match(:shard_peer_db, {shard_id, peer_id}) do - [] -> - GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id}) - _ -> nil - end - case :ets.lookup(:shard_procs, {shard_id, path}) do - [{ {^shard_id, ^path}, pid }] -> - GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg}) - [] -> - Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}") - end - end - end - - def dispatch(peer_id, {:interested, shards}) do - GenServer.cast(__MODULE__, {:interested, peer_id, shards}) - end - - def dispatch(peer_id, {:not_interested, shard}) do - GenServer.cast(__MODULE__, {:not_interested, peer_id, shard}) - end - - @doc""" - Register a process as the main process for a shard. - - Returns either :ok or :redundant, in which case the process must exit. - """ - def register(shard_id, manifest, pid) do - GenServer.call(__MODULE__, {:register, shard_id, manifest, pid}) - end - - @doc""" - Register a process as the handler for shard packets for a given path. - """ - def dispatch_to(shard_id, path, pid) do - GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) - end -end |