diff options
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r-- | shard/lib/manager.ex | 243 |
1 files changed, 243 insertions, 0 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex new file mode 100644 index 0000000..82984d6 --- /dev/null +++ b/shard/lib/manager.ex @@ -0,0 +1,243 @@ +defmodule Shard.Manager do + @moduledoc""" + Maintains several important tables : + + - :peer_db + + List of + { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen } + + - :shard_db + + List of + { id, manifest, pid | nil } + + - :shard_procs + + List of + { {id, path}, pid } + + - :shard_peer_db + + Mult-list of + { shard_id, peer_id } + + + And an internal table : + + - :outbox + + Multi-list of + { peer_id, message } + + """ + + use GenServer + + require Logger + + def start_link(my_port) do + GenServer.start_link(__MODULE__, my_port, name: __MODULE__) + end + + def init(my_port) do + :ets.new(:peer_db, [:set, :protected, :named_table]) + :ets.new(:shard_db, [:set, :protected, :named_table]) + :ets.new(:shard_procs, [:set, :protected, :named_table]) + :ets.new(:shard_peer_db, [:bag, :protected, :named_table]) + outbox = :ets.new(:outbox, [:bag, :private]) + + {:ok, %{my_port: my_port, outbox: outbox} } + end + + def handle_call({:register, shard_id, manifest, pid}, _from, state) do + will_live = case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] -> not Process.alive?(pid) + _ -> true + end + reply = if will_live do + Process.monitor(pid) + :ets.insert(:shard_db, {shard_id, manifest, pid}) + :ok + else + :redundant + end + {:reply, reply, state} + end + + def handle_cast({:dispatch_to, shard_id, path, pid}, state) do + :ets.insert(:shard_procs, { {shard_id, path}, pid }) + Process.monitor(pid) + {:noreply, state} + end + + def handle_cast({:interested, peer_id, shards}, state) do + for shard_id <- shards do + case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] -> + :ets.insert(:shard_peer_db, {shard_id, peer_id}) + GenServer.cast(pid, {:interested, peer_id}) + [] -> nil + end + end + {:noreply, state} + end + + def handle_cast({:not_interested, peer_id, shard_id}, state) do + :ets.match_delete(:shard_peer_db, {shard_id, peer_id}) + {:noreply, state} + end + + def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do + :ets.insert(:shard_peer_db, {shard_id, peer_id}) + {:noreply, state} + end + + def handle_cast({:peer_up, pk, pid, ip, port}, state) do + for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do + if pk2 != pk do + # obsolete peer information + :ets.delete(:peer_db, pk2) + :ets.match_delete(:shard_peer_db, {:_, pk2}) + end + end + :ets.insert(:peer_db, {pk, pid, ip, port}) + + # Send interested message for all our shards + id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id) + GenServer.cast(pid, {:send_msg, {:interested, id_list}}) + + # Send queued messages + for {_, msg, _} <- :ets.lookup(state.outbox, pk) do + GenServer.cast(pid, {:send_msg, msg}) + end + :ets.delete(state.outbox, pk) + + {:noreply, state} + end + + def handle_cast({:peer_down, pk, ip, port}, state) do + :ets.insert(:peer_db, {pk, nil, ip, port}) + {:noreply, state} + end + + def handle_cast({:connect_and_send, peer_id, msg}, state) do + case :ets.lookup(:peer_db, peer_id) do + [{^peer_id, nil, ip, port}] -> + add_peer(ip, port, state) + currtime = System.os_time :second + :ets.insert(state.outbox, {peer_id, msg, currtime}) + outbox_cleanup = [{{:_, :_, :'$1'}, + [{:<, :'$1', currtime - 60}], + [:'$1']}] + :ets.select_delete(state.outbox, outbox_cleanup) + _ -> + Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database" + end + {:noreply, state} + end + + def handle_cast({:try_connect, pk_list}, state) do + for pk <- pk_list do + case :ets.lookup(:peer_db, pk) do + [{^pk, nil, ip, port}] -> + add_peer(ip, port, state) + _ -> nil + end + end + {:noreply, state} + end + + def handle_cast({:add_peer, ip, port}, state) do + add_peer(ip, port, state) + {:noreply, state} + end + + def handle_info({:DOWN, _, :process, pid, _}, state) do + :ets.match_delete(:shard_procs, {:_, pid}) + {:noreply, state} + end + + defp add_peer(ip, port, state) do + spawn fn -> + case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do + {:ok, client} -> + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) + :ok = :gen_tcp.controlling_process(client, pid) + _ -> + Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" + end + end + end + + + # ================ + # PUBLIC INTERFACE + # ================ + + + @doc""" + Connect to a peer specified by ip address and port + """ + def add_peer(ip, port) do + GenServer.cast(__MODULE__, {:add_peer, ip, port}) + end + + @doc""" + Send message to a peer specified by peer id + """ + def send(peer_id, msg) do + case :ets.lookup(:peer_db, peer_id) do + [{ ^peer_id, pid, _, _}] when pid != nil-> + GenServer.cast(pid, {:send_msg, msg}) + _ -> + GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) + end + end + + @doc""" + Dispatch incoming message to correct shard process + """ + def dispatch(peer_id, {shard_id, path, msg}) do + case :ets.lookup(:shard_db, shard_id) do + [] -> + __MODULE__.send(peer_id, {:not_interested, shard_id}) + [_] -> + case :ets.match(:shard_peer_db, {shard_id, peer_id}) do + [] -> + GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id}) + _ -> nil + end + case :ets.lookup(:shard_procs, {shard_id, path}) do + [{ {^shard_id, ^path}, pid }] -> + GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg}) + [] -> + Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}") + end + end + end + + def dispatch(peer_id, {:interested, shards}) do + GenServer.cast(__MODULE__, {:interested, peer_id, shards}) + end + + def dispatch(peer_id, {:not_interested, shard}) do + GenServer.cast(__MODULE__, {:not_interested, peer_id, shard}) + end + + @doc""" + Register a process as the main process for a shard. + + Returns either :ok or :redundant, in which case the process must exit. + """ + def register(shard_id, manifest, pid) do + GenServer.call(__MODULE__, {:register, shard_id, manifest, pid}) + end + + @doc""" + Register a process as the handler for shard packets for a given path. + """ + def dispatch_to(shard_id, path, pid) do + GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) + end +end |