diff options
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r-- | shard/lib/manager.ex | 77 |
1 files changed, 55 insertions, 22 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index 82984d6..3f2bddb 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -1,3 +1,8 @@ +defprotocol Shard.Manifest do + @doc "Start the corresponding Shard process" + def start(manifest) +end + defmodule Shard.Manager do @moduledoc""" Maintains several important tables : @@ -5,7 +10,7 @@ defmodule Shard.Manager do - :peer_db List of - { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen } + { id, pid | nil, ip, port } - :shard_db @@ -36,28 +41,44 @@ defmodule Shard.Manager do require Logger + @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom + @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom + @shard_peer_db [Application.get_env(:shard, :data_path), "shard_peer_db"] |> Path.join |> String.to_atom + def start_link(my_port) do GenServer.start_link(__MODULE__, my_port, name: __MODULE__) end def init(my_port) do - :ets.new(:peer_db, [:set, :protected, :named_table]) - :ets.new(:shard_db, [:set, :protected, :named_table]) + :dets.open_file(@peer_db, [type: :set]) + for [{id, _pid, ip, port}] <- :dets.match @peer_db, :"$1" do + :dets.insert @peer_db, {id, nil, ip, port} + # connect blindly to everyone + add_peer(ip, port) + end + + :dets.open_file(@shard_db, [type: :set]) + for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do + :dets.insert @shard_db, {id, manifest, nil} + spawn fn -> Shard.Manifest.start manifest end + end + + :dets.open_file(@shard_peer_db, [type: :bag]) + :ets.new(:shard_procs, [:set, :protected, :named_table]) - :ets.new(:shard_peer_db, [:bag, :protected, :named_table]) outbox = :ets.new(:outbox, [:bag, :private]) {:ok, %{my_port: my_port, outbox: outbox} } end def handle_call({:register, shard_id, manifest, pid}, _from, state) do - will_live = case :ets.lookup(:shard_db, shard_id) do - [{ ^shard_id, _, pid }] -> not Process.alive?(pid) + will_live = case :dets.lookup(@shard_db, shard_id) do + [{ ^shard_id, _, pid }] when pid != nil -> not Process.alive?(pid) _ -> true end reply = if will_live do Process.monitor(pid) - :ets.insert(:shard_db, {shard_id, manifest, pid}) + :dets.insert(@shard_db, {shard_id, manifest, pid}) :ok else :redundant @@ -73,9 +94,9 @@ defmodule Shard.Manager do def handle_cast({:interested, peer_id, shards}, state) do for shard_id <- shards do - case :ets.lookup(:shard_db, shard_id) do + case :dets.lookup(@shard_db, shard_id) do [{ ^shard_id, _, pid }] -> - :ets.insert(:shard_peer_db, {shard_id, peer_id}) + :dets.insert(@shard_peer_db, {shard_id, peer_id}) GenServer.cast(pid, {:interested, peer_id}) [] -> nil end @@ -84,27 +105,27 @@ defmodule Shard.Manager do end def handle_cast({:not_interested, peer_id, shard_id}, state) do - :ets.match_delete(:shard_peer_db, {shard_id, peer_id}) + :dets.match_delete(@shard_peer_db, {shard_id, peer_id}) {:noreply, state} end def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do - :ets.insert(:shard_peer_db, {shard_id, peer_id}) + :dets.insert(@shard_peer_db, {shard_id, peer_id}) {:noreply, state} end def handle_cast({:peer_up, pk, pid, ip, port}, state) do - for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do + for [pk2] <- :dets.match(@peer_db, {:'$1', :_, ip, port}) do if pk2 != pk do # obsolete peer information - :ets.delete(:peer_db, pk2) - :ets.match_delete(:shard_peer_db, {:_, pk2}) + :dets.delete(@peer_db, pk2) + :dets.match_delete(@shard_peer_db, {:_, pk2}) end end - :ets.insert(:peer_db, {pk, pid, ip, port}) + :dets.insert(@peer_db, {pk, pid, ip, port}) # Send interested message for all our shards - id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id) + id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id) GenServer.cast(pid, {:send_msg, {:interested, id_list}}) # Send queued messages @@ -117,12 +138,12 @@ defmodule Shard.Manager do end def handle_cast({:peer_down, pk, ip, port}, state) do - :ets.insert(:peer_db, {pk, nil, ip, port}) + :dets.insert(@peer_db, {pk, nil, ip, port}) {:noreply, state} end def handle_cast({:connect_and_send, peer_id, msg}, state) do - case :ets.lookup(:peer_db, peer_id) do + case :dets.lookup(@peer_db, peer_id) do [{^peer_id, nil, ip, port}] -> add_peer(ip, port, state) currtime = System.os_time :second @@ -139,7 +160,7 @@ defmodule Shard.Manager do def handle_cast({:try_connect, pk_list}, state) do for pk <- pk_list do - case :ets.lookup(:peer_db, pk) do + case :dets.lookup(@peer_db, pk) do [{^pk, nil, ip, port}] -> add_peer(ip, port, state) _ -> nil @@ -187,7 +208,7 @@ defmodule Shard.Manager do Send message to a peer specified by peer id """ def send(peer_id, msg) do - case :ets.lookup(:peer_db, peer_id) do + case :dets.lookup(@peer_db, peer_id) do [{ ^peer_id, pid, _, _}] when pid != nil-> GenServer.cast(pid, {:send_msg, msg}) _ -> @@ -199,11 +220,11 @@ defmodule Shard.Manager do Dispatch incoming message to correct shard process """ def dispatch(peer_id, {shard_id, path, msg}) do - case :ets.lookup(:shard_db, shard_id) do + case :dets.lookup(@shard_db, shard_id) do [] -> __MODULE__.send(peer_id, {:not_interested, shard_id}) [_] -> - case :ets.match(:shard_peer_db, {shard_id, peer_id}) do + case :dets.match(@shard_peer_db, {shard_id, peer_id}) do [] -> GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id}) _ -> nil @@ -240,4 +261,16 @@ defmodule Shard.Manager do def dispatch_to(shard_id, path, pid) do GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) end + + def list_shards() do + for [x] <- :dets.match(@shard_db, :"$1"), do: x + end + + def list_peers() do + for [x] <- :dets.match(@peer_db, :"$1"), do: x + end + + def get_shard_peers(shard_id) do + for [x] <- :dets.match(@shard_peer_db, {shard_id, :"$1"}), do: x + end end |