diff options
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r-- | shard/lib/manager.ex | 147 |
1 files changed, 76 insertions, 71 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index d6b493b..3a0e21c 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -5,14 +5,18 @@ defprotocol Shard.Manifest do The hash of the manifest is the unique identifier of that shard on the network. The Manifest protocol is a protocol implemented by the manifest structs for the - different shard types. It contains an operation start() that is able to launch the - correct process for this shard and connect to other peers that use it. + different shard types. It contains an operation module() that returns the main module + for the shard processes. The module must contain a function with the signature: + + {:ok, pid} = <module>.start_link(manifest) + + that will be called when the shard must be started. """ @doc""" - Start the corresponding Shard process + Get the module in question. """ - def start(manifest) + def module(manifest) end defmodule Shard.Manager do @@ -22,19 +26,15 @@ defmodule Shard.Manager do - :shard_db (persistent with DETS) List of - { id, manifest, pid | nil } - - - :shard_state (persistent with DETS) - - List of - { id, state } + { id, manifest, state } - :peer_db (persistent with DETS) Mult-list of { shard_id, peer_info } # TODO: add health info (last seen, ping, etc) - peer_info := {:inet, ip, port} | {:inet6, ip, port} | {:onion, name} + peer_info := {:inet, ip, port} + TODO peer_info |= {:inet6, ip, port} | {:onion, name} - :shard_procs (not persistent) @@ -47,7 +47,6 @@ defmodule Shard.Manager do require Logger @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom - @shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom def start_link(_) do @@ -55,66 +54,64 @@ defmodule Shard.Manager do end def init(_) do - :dets.open_file(@shard_db, [type: :set]) - for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do - :dets.insert @shard_db, {id, manifest, nil} - spawn fn -> Shard.Manifest.start manifest end - end + Process.flag(:trap_exit, true) - :dets.open_file(@shard_state, [type: :set]) + :dets.open_file(@shard_db, [type: :set]) :dets.open_file(@peer_db, [type: :bag]) :ets.new(:shard_procs, [:set, :protected, :named_table]) - {:ok, nil} + {:ok, %{}} end - def handle_call({:register, shard_id, manifest, pid}, _from, state) do - will_live = case :dets.lookup(@shard_db, shard_id) do - [{ ^shard_id, _, pid }] when pid != nil -> not Process.alive?(pid) - _ -> true + def handle_call({:find_or_start, shard_id, manifest}, _from, state) do + case :dets.lookup(@shard_db, shard_id) do + [] -> :dets.insert(@shard_db, {shard_id, manifest, nil}) + _ -> nil end - reply = if will_live do - Process.monitor(pid) - :dets.insert(@shard_db, {shard_id, manifest, pid}) - :ok - else - :redundant + + case :ets.lookup(:shard_procs, {shard_id, nil}) do + [] -> + {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest]) + :ets.insert(:shard_procs, {{shard_id, nil}, pid}) + state = Map.put(state, pid, {shard_id, nil}) + {:reply, pid, state} + pid -> + {:reply, pid, state} end - {:reply, reply, state} end def handle_cast({:dispatch_to, shard_id, path, pid}, state) do :ets.insert(:shard_procs, { {shard_id, path}, pid }) - Process.monitor(pid) + state = Map.put(state, pid, {shard_id, path}) + if path != nil do + Process.monitor(pid) + end {:noreply, state} end - def handle_cast({:interested, conn_pid, peer_info, auth, shards}, state) do - for shard_id <- shards do - case :dets.lookup(@shard_db, shard_id) do - [{ ^shard_id, _, pid }] -> - :dets.insert(@peer_db, {shard_id, peer_info}) - GenServer.cast(pid, {:interested, conn_pid, auth}) - [] -> nil - end - end + def handle_cast({:peer_db_insert, shard_id, peer_info}, state) do + :dets.insert(@peer_db, {shard_id, peer_info}) {:noreply, state} end - def handle_cast({:not_interested, peer_info, shard_id}, state) do + def handle_cast({:peer_db_delete, shard_id, peer_info}, state) do :dets.match_delete(@peer_db, {shard_id, peer_info}) {:noreply, state} end - def handle_cast({:shard_peer_db_insert, shard_id, peer_info}, state) do - :dets.insert(@peer_db, {shard_id, peer_info}) - {:noreply, state} + def handle_info({:DOWN, _, :process, pid, reason}, state) do + handle_info({:EXIT, pid, reason}, state) end - def handle_info({:DOWN, _, :process, pid, _}, state) do - :ets.match_delete(:shard_procs, {:_, pid}) - {:noreply, state} + def handle_info({:EXIT, pid, _reason}, state) do + case state[pid] do + nil -> {:noreply, state} + info -> + :ets.delete(:shard_procs, info) + state = Map.delete(state, pid) + {:noreply, state} + end end @@ -126,29 +123,37 @@ defmodule Shard.Manager do Dispatch incoming message to correct shard process """ def incoming(conn_pid, peer_info, auth, {:interested, shards}) do - GenServer.cast(__MODULE__, {:interested, conn_pid, peer_info, auth, shards}) + for shard_id <- shards do + case :dets.lookup(@shard_db, shard_id) do + [{ ^shard_id, manifest, _ }] -> + GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info}) + pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do + [] -> + GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest}) + [{{^shard_id, nil}, pid}] -> pid + end + GenServer.cast(pid, {:interested, conn_pid, auth}) + [] -> nil + end + end end def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do - GenServer.cast(__MODULE__, {:not_interested, peer_info, shard}) + GenServer.cast(__MODULE__, {:peer_db_delete, shard, peer_info}) end def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do case :dets.lookup(@shard_db, shard_id) do [] -> GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}}) - [_] -> - case :dets.match(@peer_db, {shard_id, peer_info}) do + [{ ^shard_id, manifest, _}] -> + GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info}) + pid = case :ets.lookup(:shard_procs, {shard_id, path}) do [] -> - GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_info}) - _ -> nil - end - case :ets.lookup(:shard_procs, {shard_id, path}) do - [{ {^shard_id, ^path}, pid }] -> - GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg}) - [] -> - Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}") + GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest}) + [{ {^shard_id, ^path}, pid }] -> pid end + GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg}) end end @@ -184,8 +189,8 @@ defmodule Shard.Manager do Return the saved state value for a shard """ def load_state(shard_id) do - case :dets.lookup(@shard_state, shard_id) do - [{^shard_id, state}] -> state + case :dets.lookup(@shard_db, shard_id) do + [{^shard_id, _, state}] -> state _ -> nil end end @@ -194,7 +199,10 @@ defmodule Shard.Manager do Save a state value for a shard """ def save_state(shard_id, state) do - :dets.insert(@shard_state, {shard_id, state}) + case :dets.lookup(@shard_db, shard_id) do + [{^shard_id, manifest, _old_state}] -> + :dets.insert(@shard_db, {shard_id, manifest, state}) + end end @@ -206,8 +214,8 @@ defmodule Shard.Manager do Returns the pid for a shard if it exists """ def find_proc(shard_id) do - case :dets.lookup(@shard_db, shard_id) do - [{^shard_id, _, pid}] -> pid + case :ets.lookup(:shard_procs, {shard_id, nil}) do + [{{^shard_id, _}, pid}] -> pid _ -> nil end end @@ -218,13 +226,10 @@ defmodule Shard.Manager do """ def find_or_start(manifest) do id = SData.term_hash manifest - case find_proc id do - nil -> - case Shard.Manifest.start manifest do - {:ok, pid} -> pid - {:error, :redundant} -> find_proc id - end - pid -> pid + case :ets.lookup(:shard_procs, {id, nil}) do + [{{^id, nil}, pid}] -> pid + [] -> + GenServer.call(__MODULE__, {:find_or_start, id, manifest}) end end |