aboutsummaryrefslogtreecommitdiff
path: root/lib/manager.ex
diff options
context:
space:
mode:
Diffstat (limited to 'lib/manager.ex')
-rw-r--r--lib/manager.ex243
1 files changed, 0 insertions, 243 deletions
diff --git a/lib/manager.ex b/lib/manager.ex
deleted file mode 100644
index 82984d6..0000000
--- a/lib/manager.ex
+++ /dev/null
@@ -1,243 +0,0 @@
-defmodule Shard.Manager do
- @moduledoc"""
- Maintains several important tables :
-
- - :peer_db
-
- List of
- { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }
-
- - :shard_db
-
- List of
- { id, manifest, pid | nil }
-
- - :shard_procs
-
- List of
- { {id, path}, pid }
-
- - :shard_peer_db
-
- Mult-list of
- { shard_id, peer_id }
-
-
- And an internal table :
-
- - :outbox
-
- Multi-list of
- { peer_id, message }
-
- """
-
- use GenServer
-
- require Logger
-
- def start_link(my_port) do
- GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
- end
-
- def init(my_port) do
- :ets.new(:peer_db, [:set, :protected, :named_table])
- :ets.new(:shard_db, [:set, :protected, :named_table])
- :ets.new(:shard_procs, [:set, :protected, :named_table])
- :ets.new(:shard_peer_db, [:bag, :protected, :named_table])
- outbox = :ets.new(:outbox, [:bag, :private])
-
- {:ok, %{my_port: my_port, outbox: outbox} }
- end
-
- def handle_call({:register, shard_id, manifest, pid}, _from, state) do
- will_live = case :ets.lookup(:shard_db, shard_id) do
- [{ ^shard_id, _, pid }] -> not Process.alive?(pid)
- _ -> true
- end
- reply = if will_live do
- Process.monitor(pid)
- :ets.insert(:shard_db, {shard_id, manifest, pid})
- :ok
- else
- :redundant
- end
- {:reply, reply, state}
- end
-
- def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
- :ets.insert(:shard_procs, { {shard_id, path}, pid })
- Process.monitor(pid)
- {:noreply, state}
- end
-
- def handle_cast({:interested, peer_id, shards}, state) do
- for shard_id <- shards do
- case :ets.lookup(:shard_db, shard_id) do
- [{ ^shard_id, _, pid }] ->
- :ets.insert(:shard_peer_db, {shard_id, peer_id})
- GenServer.cast(pid, {:interested, peer_id})
- [] -> nil
- end
- end
- {:noreply, state}
- end
-
- def handle_cast({:not_interested, peer_id, shard_id}, state) do
- :ets.match_delete(:shard_peer_db, {shard_id, peer_id})
- {:noreply, state}
- end
-
- def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do
- :ets.insert(:shard_peer_db, {shard_id, peer_id})
- {:noreply, state}
- end
-
- def handle_cast({:peer_up, pk, pid, ip, port}, state) do
- for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do
- if pk2 != pk do
- # obsolete peer information
- :ets.delete(:peer_db, pk2)
- :ets.match_delete(:shard_peer_db, {:_, pk2})
- end
- end
- :ets.insert(:peer_db, {pk, pid, ip, port})
-
- # Send interested message for all our shards
- id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id)
- GenServer.cast(pid, {:send_msg, {:interested, id_list}})
-
- # Send queued messages
- for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
- GenServer.cast(pid, {:send_msg, msg})
- end
- :ets.delete(state.outbox, pk)
-
- {:noreply, state}
- end
-
- def handle_cast({:peer_down, pk, ip, port}, state) do
- :ets.insert(:peer_db, {pk, nil, ip, port})
- {:noreply, state}
- end
-
- def handle_cast({:connect_and_send, peer_id, msg}, state) do
- case :ets.lookup(:peer_db, peer_id) do
- [{^peer_id, nil, ip, port}] ->
- add_peer(ip, port, state)
- currtime = System.os_time :second
- :ets.insert(state.outbox, {peer_id, msg, currtime})
- outbox_cleanup = [{{:_, :_, :'$1'},
- [{:<, :'$1', currtime - 60}],
- [:'$1']}]
- :ets.select_delete(state.outbox, outbox_cleanup)
- _ ->
- Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database"
- end
- {:noreply, state}
- end
-
- def handle_cast({:try_connect, pk_list}, state) do
- for pk <- pk_list do
- case :ets.lookup(:peer_db, pk) do
- [{^pk, nil, ip, port}] ->
- add_peer(ip, port, state)
- _ -> nil
- end
- end
- {:noreply, state}
- end
-
- def handle_cast({:add_peer, ip, port}, state) do
- add_peer(ip, port, state)
- {:noreply, state}
- end
-
- def handle_info({:DOWN, _, :process, pid, _}, state) do
- :ets.match_delete(:shard_procs, {:_, pid})
- {:noreply, state}
- end
-
- defp add_peer(ip, port, state) do
- spawn fn ->
- case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
- {:ok, client} ->
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
- :ok = :gen_tcp.controlling_process(client, pid)
- _ ->
- Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
- end
- end
- end
-
-
- # ================
- # PUBLIC INTERFACE
- # ================
-
-
- @doc"""
- Connect to a peer specified by ip address and port
- """
- def add_peer(ip, port) do
- GenServer.cast(__MODULE__, {:add_peer, ip, port})
- end
-
- @doc"""
- Send message to a peer specified by peer id
- """
- def send(peer_id, msg) do
- case :ets.lookup(:peer_db, peer_id) do
- [{ ^peer_id, pid, _, _}] when pid != nil->
- GenServer.cast(pid, {:send_msg, msg})
- _ ->
- GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
- end
- end
-
- @doc"""
- Dispatch incoming message to correct shard process
- """
- def dispatch(peer_id, {shard_id, path, msg}) do
- case :ets.lookup(:shard_db, shard_id) do
- [] ->
- __MODULE__.send(peer_id, {:not_interested, shard_id})
- [_] ->
- case :ets.match(:shard_peer_db, {shard_id, peer_id}) do
- [] ->
- GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id})
- _ -> nil
- end
- case :ets.lookup(:shard_procs, {shard_id, path}) do
- [{ {^shard_id, ^path}, pid }] ->
- GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg})
- [] ->
- Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}")
- end
- end
- end
-
- def dispatch(peer_id, {:interested, shards}) do
- GenServer.cast(__MODULE__, {:interested, peer_id, shards})
- end
-
- def dispatch(peer_id, {:not_interested, shard}) do
- GenServer.cast(__MODULE__, {:not_interested, peer_id, shard})
- end
-
- @doc"""
- Register a process as the main process for a shard.
-
- Returns either :ok or :redundant, in which case the process must exit.
- """
- def register(shard_id, manifest, pid) do
- GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
- end
-
- @doc"""
- Register a process as the handler for shard packets for a given path.
- """
- def dispatch_to(shard_id, path, pid) do
- GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
- end
-end