diff options
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r-- | shard/lib/manager.ex | 169 |
1 files changed, 94 insertions, 75 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index 617378c..ccb750f 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -19,38 +19,42 @@ defmodule Shard.Manager do @moduledoc""" Maintains several important tables : - - :peer_db - - List of - { id, pid | nil, ip, port } - - - :shard_db + - :shard_db (persistent with DETS) List of { id, manifest, pid | nil } - - :shard_state + - :shard_state (persistent with DETS) List of { id, state } - - :shard_procs + - :peer_db (persistent with DETS) + + Mult-list of + { shard_id, peer_info } # TODO: add health info (last seen, ping, etc) + + peer_info := {:inet4, ip, port} | {:inet6, ip, port} | {:onion, name} + + - :shard_procs (not persistent) List of { {id, path}, pid } - - :shard_peer_db - - Mult-list of - { shard_id, peer_id } + - :connections (not persistent) + List of + { nil | his_pk, nil | my_pk, pid, peer_info } And an internal table : - - :outbox + - :outbox (not persistent) Multi-list of - { peer_id, message, time_inserted } + { dest, auth_info, message, time_inserted } + + dest := peer_info + auth_info := nil | { his_pk, my_pk_list } """ @@ -58,23 +62,15 @@ defmodule Shard.Manager do require Logger - @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom @shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom - @shard_peer_db [Application.get_env(:shard, :data_path), "shard_peer_db"] |> Path.join |> String.to_atom + @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom def start_link(my_port) do GenServer.start_link(__MODULE__, my_port, name: __MODULE__) end def init(my_port) do - :dets.open_file(@peer_db, [type: :set]) - for [{id, _pid, ip, port}] <- :dets.match @peer_db, :"$1" do - :dets.insert @peer_db, {id, nil, ip, port} - # connect blindly to everyone - add_peer(ip, port) - end - :dets.open_file(@shard_db, [type: :set]) for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do :dets.insert @shard_db, {id, manifest, nil} @@ -82,9 +78,10 @@ defmodule Shard.Manager do end :dets.open_file(@shard_state, [type: :set]) - :dets.open_file(@shard_peer_db, [type: :bag]) + :dets.open_file(@peer_db, [type: :bag]) :ets.new(:shard_procs, [:set, :protected, :named_table]) + :ets.new(:connections, [:bag, :protected, :named_table]) outbox = :ets.new(:outbox, [:bag, :private]) {:ok, %{my_port: my_port, outbox: outbox} } @@ -111,12 +108,12 @@ defmodule Shard.Manager do {:noreply, state} end - def handle_cast({:interested, peer_id, shards}, state) do + def handle_cast({:interested, peer_info, shards}, state) do for shard_id <- shards do case :dets.lookup(@shard_db, shard_id) do [{ ^shard_id, _, pid }] -> - :dets.insert(@shard_peer_db, {shard_id, peer_id}) - GenServer.cast(pid, {:interested, peer_id}) + :dets.insert(@peer_db, {shard_id, peer_info}) + GenServer.cast(pid, {:interested, peer_info}) [] -> nil end end @@ -209,34 +206,23 @@ defmodule Shard.Manager do end - # ================ - # PUBLIC INTERFACE - # ================ - + # ==================== + # INTERFACE WITH PEERS + # ==================== - @doc""" - Connect to a peer specified by ip address and port - """ - def add_peer(ip, port) do - GenServer.cast(__MODULE__, {:add_peer, ip, port}) + def incoming(conn_pid, {:interested, shards}) do + GenServer.cast(__MODULE__, {:interested, peer_id, shards}) end - @doc""" - Send message to a peer specified by peer id - """ - def send(peer_id, msg) do - case :dets.lookup(@peer_db, peer_id) do - [{ ^peer_id, pid, _, _}] when pid != nil-> - GenServer.cast(pid, {:send_msg, msg}) - _ -> - GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) - end + def incoming(conn_pid, {:not_interested, shard}) do + GenServer.cast(__MODULE__, {:not_interested, peer_id, shard}) end @doc""" Dispatch incoming message to correct shard process """ - def dispatch(peer_id, {shard_id, path, msg}) do + defp dispatch(conn_pid, {shard_id, path, msg}) do + # TODO: auth case :dets.lookup(@shard_db, shard_id) do [] -> __MODULE__.send(peer_id, {:not_interested, shard_id}) @@ -255,12 +241,33 @@ defmodule Shard.Manager do end end - def dispatch(peer_id, {:interested, shards}) do - GenServer.cast(__MODULE__, {:interested, peer_id, shards}) + + # ===================== + # INTERFACE WITH SHARDS + # ===================== + + @doc""" + Send message to a peer specified by peer id + """ + def send(peer_id, msg) do + case :dets.lookup(@peer_db, peer_id) do + [{ ^peer_id, pid, _, _}] when pid != nil-> + GenServer.cast(pid, {:send_msg, msg}) + _ -> + GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) + end end - def dispatch(peer_id, {:not_interested, shard}) do - GenServer.cast(__MODULE__, {:not_interested, peer_id, shard}) + @doc""" + Send message to a peer through an authenticated channel + + his_auth: accepted users to talk to, either single pk or list of pk + + Returns true if a corresponding channel was open and msg was sent, + false otherwise. + """ + def send(peer_id, my_auth, his_auth, msg) do + # TODO end @doc""" @@ -273,16 +280,6 @@ defmodule Shard.Manager do end @doc""" - Returns the pid for a shard if it exists - """ - def find_proc(shard_id) do - case :dets.lookup(@shard_db, shard_id) do - [{^shard_id, _, pid}] -> pid - _ -> nil - end - end - - @doc""" Register a process as the handler for shard packets for a given path. """ def dispatch_to(shard_id, path, pid) do @@ -290,20 +287,6 @@ defmodule Shard.Manager do end @doc""" - Return the list of all shards. - """ - def list_shards() do - for [x] <- :dets.match(@shard_db, :"$1"), do: x - end - - @doc""" - Return the list of all peers - """ - def list_peers() do - for [x] <- :dets.match(@peer_db, :"$1"), do: x - end - - @doc""" Return the list of all peer IDs that are interested in a certain shard """ def get_shard_peers(shard_id) do @@ -326,4 +309,40 @@ defmodule Shard.Manager do def save_state(shard_id, state) do :dets.insert(@shard_state, {shard_id, state}) end + + + # ========================== + # INTERFACE FOR OTHER THINGS + # ========================== + + @doc""" + Connect to a peer specified by ip address and port + """ + def add_peer(ip, port) do + GenServer.cast(__MODULE__, {:add_peer, ip, port}) + end + + @doc""" + Returns the pid for a shard if it exists + """ + def find_proc(shard_id) do + case :dets.lookup(@shard_db, shard_id) do + [{^shard_id, _, pid}] -> pid + _ -> nil + end + end + + @doc""" + Return the list of all shards. + """ + def list_shards() do + for [x] <- :dets.match(@shard_db, :"$1"), do: x + end + + @doc""" + Return the list of all peers + """ + def list_peers() do + for [x] <- :dets.match(@peer_db, :"$1"), do: x + end end |