aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/manager.ex
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r--shard/lib/manager.ex147
1 files changed, 76 insertions, 71 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index d6b493b..3a0e21c 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -5,14 +5,18 @@ defprotocol Shard.Manifest do
The hash of the manifest is the unique identifier of that shard on the network.
The Manifest protocol is a protocol implemented by the manifest structs for the
- different shard types. It contains an operation start() that is able to launch the
- correct process for this shard and connect to other peers that use it.
+ different shard types. It contains an operation module() that returns the main module
+ for the shard processes. The module must contain a function with the signature:
+
+ {:ok, pid} = <module>.start_link(manifest)
+
+ that will be called when the shard must be started.
"""
@doc"""
- Start the corresponding Shard process
+ Get the module in question.
"""
- def start(manifest)
+ def module(manifest)
end
defmodule Shard.Manager do
@@ -22,19 +26,15 @@ defmodule Shard.Manager do
- :shard_db (persistent with DETS)
List of
- { id, manifest, pid | nil }
-
- - :shard_state (persistent with DETS)
-
- List of
- { id, state }
+ { id, manifest, state }
- :peer_db (persistent with DETS)
Mult-list of
{ shard_id, peer_info } # TODO: add health info (last seen, ping, etc)
- peer_info := {:inet, ip, port} | {:inet6, ip, port} | {:onion, name}
+ peer_info := {:inet, ip, port}
+ TODO peer_info |= {:inet6, ip, port} | {:onion, name}
- :shard_procs (not persistent)
@@ -47,7 +47,6 @@ defmodule Shard.Manager do
require Logger
@shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
- @shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom
@peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
def start_link(_) do
@@ -55,66 +54,64 @@ defmodule Shard.Manager do
end
def init(_) do
- :dets.open_file(@shard_db, [type: :set])
- for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do
- :dets.insert @shard_db, {id, manifest, nil}
- spawn fn -> Shard.Manifest.start manifest end
- end
+ Process.flag(:trap_exit, true)
- :dets.open_file(@shard_state, [type: :set])
+ :dets.open_file(@shard_db, [type: :set])
:dets.open_file(@peer_db, [type: :bag])
:ets.new(:shard_procs, [:set, :protected, :named_table])
- {:ok, nil}
+ {:ok, %{}}
end
- def handle_call({:register, shard_id, manifest, pid}, _from, state) do
- will_live = case :dets.lookup(@shard_db, shard_id) do
- [{ ^shard_id, _, pid }] when pid != nil -> not Process.alive?(pid)
- _ -> true
+ def handle_call({:find_or_start, shard_id, manifest}, _from, state) do
+ case :dets.lookup(@shard_db, shard_id) do
+ [] -> :dets.insert(@shard_db, {shard_id, manifest, nil})
+ _ -> nil
end
- reply = if will_live do
- Process.monitor(pid)
- :dets.insert(@shard_db, {shard_id, manifest, pid})
- :ok
- else
- :redundant
+
+ case :ets.lookup(:shard_procs, {shard_id, nil}) do
+ [] ->
+ {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest])
+ :ets.insert(:shard_procs, {{shard_id, nil}, pid})
+ state = Map.put(state, pid, {shard_id, nil})
+ {:reply, pid, state}
+ pid ->
+ {:reply, pid, state}
end
- {:reply, reply, state}
end
def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
:ets.insert(:shard_procs, { {shard_id, path}, pid })
- Process.monitor(pid)
+ state = Map.put(state, pid, {shard_id, path})
+ if path != nil do
+ Process.monitor(pid)
+ end
{:noreply, state}
end
- def handle_cast({:interested, conn_pid, peer_info, auth, shards}, state) do
- for shard_id <- shards do
- case :dets.lookup(@shard_db, shard_id) do
- [{ ^shard_id, _, pid }] ->
- :dets.insert(@peer_db, {shard_id, peer_info})
- GenServer.cast(pid, {:interested, conn_pid, auth})
- [] -> nil
- end
- end
+ def handle_cast({:peer_db_insert, shard_id, peer_info}, state) do
+ :dets.insert(@peer_db, {shard_id, peer_info})
{:noreply, state}
end
- def handle_cast({:not_interested, peer_info, shard_id}, state) do
+ def handle_cast({:peer_db_delete, shard_id, peer_info}, state) do
:dets.match_delete(@peer_db, {shard_id, peer_info})
{:noreply, state}
end
- def handle_cast({:shard_peer_db_insert, shard_id, peer_info}, state) do
- :dets.insert(@peer_db, {shard_id, peer_info})
- {:noreply, state}
+ def handle_info({:DOWN, _, :process, pid, reason}, state) do
+ handle_info({:EXIT, pid, reason}, state)
end
- def handle_info({:DOWN, _, :process, pid, _}, state) do
- :ets.match_delete(:shard_procs, {:_, pid})
- {:noreply, state}
+ def handle_info({:EXIT, pid, _reason}, state) do
+ case state[pid] do
+ nil -> {:noreply, state}
+ info ->
+ :ets.delete(:shard_procs, info)
+ state = Map.delete(state, pid)
+ {:noreply, state}
+ end
end
@@ -126,29 +123,37 @@ defmodule Shard.Manager do
Dispatch incoming message to correct shard process
"""
def incoming(conn_pid, peer_info, auth, {:interested, shards}) do
- GenServer.cast(__MODULE__, {:interested, conn_pid, peer_info, auth, shards})
+ for shard_id <- shards do
+ case :dets.lookup(@shard_db, shard_id) do
+ [{ ^shard_id, manifest, _ }] ->
+ GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
+ pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do
+ [] ->
+ GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
+ [{{^shard_id, nil}, pid}] -> pid
+ end
+ GenServer.cast(pid, {:interested, conn_pid, auth})
+ [] -> nil
+ end
+ end
end
def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do
- GenServer.cast(__MODULE__, {:not_interested, peer_info, shard})
+ GenServer.cast(__MODULE__, {:peer_db_delete, shard, peer_info})
end
def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do
case :dets.lookup(@shard_db, shard_id) do
[] ->
GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}})
- [_] ->
- case :dets.match(@peer_db, {shard_id, peer_info}) do
+ [{ ^shard_id, manifest, _}] ->
+ GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
+ pid = case :ets.lookup(:shard_procs, {shard_id, path}) do
[] ->
- GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_info})
- _ -> nil
- end
- case :ets.lookup(:shard_procs, {shard_id, path}) do
- [{ {^shard_id, ^path}, pid }] ->
- GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg})
- [] ->
- Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}")
+ GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
+ [{ {^shard_id, ^path}, pid }] -> pid
end
+ GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg})
end
end
@@ -184,8 +189,8 @@ defmodule Shard.Manager do
Return the saved state value for a shard
"""
def load_state(shard_id) do
- case :dets.lookup(@shard_state, shard_id) do
- [{^shard_id, state}] -> state
+ case :dets.lookup(@shard_db, shard_id) do
+ [{^shard_id, _, state}] -> state
_ -> nil
end
end
@@ -194,7 +199,10 @@ defmodule Shard.Manager do
Save a state value for a shard
"""
def save_state(shard_id, state) do
- :dets.insert(@shard_state, {shard_id, state})
+ case :dets.lookup(@shard_db, shard_id) do
+ [{^shard_id, manifest, _old_state}] ->
+ :dets.insert(@shard_db, {shard_id, manifest, state})
+ end
end
@@ -206,8 +214,8 @@ defmodule Shard.Manager do
Returns the pid for a shard if it exists
"""
def find_proc(shard_id) do
- case :dets.lookup(@shard_db, shard_id) do
- [{^shard_id, _, pid}] -> pid
+ case :ets.lookup(:shard_procs, {shard_id, nil}) do
+ [{{^shard_id, _}, pid}] -> pid
_ -> nil
end
end
@@ -218,13 +226,10 @@ defmodule Shard.Manager do
"""
def find_or_start(manifest) do
id = SData.term_hash manifest
- case find_proc id do
- nil ->
- case Shard.Manifest.start manifest do
- {:ok, pid} -> pid
- {:error, :redundant} -> find_proc id
- end
- pid -> pid
+ case :ets.lookup(:shard_procs, {id, nil}) do
+ [{{^id, nil}, pid}] -> pid
+ [] ->
+ GenServer.call(__MODULE__, {:find_or_start, id, manifest})
end
end