aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/manager.ex
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r--shard/lib/manager.ex169
1 files changed, 94 insertions, 75 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index 617378c..ccb750f 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -19,38 +19,42 @@ defmodule Shard.Manager do
@moduledoc"""
Maintains several important tables :
- - :peer_db
-
- List of
- { id, pid | nil, ip, port }
-
- - :shard_db
+ - :shard_db (persistent with DETS)
List of
{ id, manifest, pid | nil }
- - :shard_state
+ - :shard_state (persistent with DETS)
List of
{ id, state }
- - :shard_procs
+ - :peer_db (persistent with DETS)
+
+ Mult-list of
+ { shard_id, peer_info } # TODO: add health info (last seen, ping, etc)
+
+ peer_info := {:inet4, ip, port} | {:inet6, ip, port} | {:onion, name}
+
+ - :shard_procs (not persistent)
List of
{ {id, path}, pid }
- - :shard_peer_db
-
- Mult-list of
- { shard_id, peer_id }
+ - :connections (not persistent)
+ List of
+ { nil | his_pk, nil | my_pk, pid, peer_info }
And an internal table :
- - :outbox
+ - :outbox (not persistent)
Multi-list of
- { peer_id, message, time_inserted }
+ { dest, auth_info, message, time_inserted }
+
+ dest := peer_info
+ auth_info := nil | { his_pk, my_pk_list }
"""
@@ -58,23 +62,15 @@ defmodule Shard.Manager do
require Logger
- @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
@shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
@shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom
- @shard_peer_db [Application.get_env(:shard, :data_path), "shard_peer_db"] |> Path.join |> String.to_atom
+ @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
def start_link(my_port) do
GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
end
def init(my_port) do
- :dets.open_file(@peer_db, [type: :set])
- for [{id, _pid, ip, port}] <- :dets.match @peer_db, :"$1" do
- :dets.insert @peer_db, {id, nil, ip, port}
- # connect blindly to everyone
- add_peer(ip, port)
- end
-
:dets.open_file(@shard_db, [type: :set])
for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do
:dets.insert @shard_db, {id, manifest, nil}
@@ -82,9 +78,10 @@ defmodule Shard.Manager do
end
:dets.open_file(@shard_state, [type: :set])
- :dets.open_file(@shard_peer_db, [type: :bag])
+ :dets.open_file(@peer_db, [type: :bag])
:ets.new(:shard_procs, [:set, :protected, :named_table])
+ :ets.new(:connections, [:bag, :protected, :named_table])
outbox = :ets.new(:outbox, [:bag, :private])
{:ok, %{my_port: my_port, outbox: outbox} }
@@ -111,12 +108,12 @@ defmodule Shard.Manager do
{:noreply, state}
end
- def handle_cast({:interested, peer_id, shards}, state) do
+ def handle_cast({:interested, peer_info, shards}, state) do
for shard_id <- shards do
case :dets.lookup(@shard_db, shard_id) do
[{ ^shard_id, _, pid }] ->
- :dets.insert(@shard_peer_db, {shard_id, peer_id})
- GenServer.cast(pid, {:interested, peer_id})
+ :dets.insert(@peer_db, {shard_id, peer_info})
+ GenServer.cast(pid, {:interested, peer_info})
[] -> nil
end
end
@@ -209,34 +206,23 @@ defmodule Shard.Manager do
end
- # ================
- # PUBLIC INTERFACE
- # ================
-
+ # ====================
+ # INTERFACE WITH PEERS
+ # ====================
- @doc"""
- Connect to a peer specified by ip address and port
- """
- def add_peer(ip, port) do
- GenServer.cast(__MODULE__, {:add_peer, ip, port})
+ def incoming(conn_pid, {:interested, shards}) do
+ GenServer.cast(__MODULE__, {:interested, peer_id, shards})
end
- @doc"""
- Send message to a peer specified by peer id
- """
- def send(peer_id, msg) do
- case :dets.lookup(@peer_db, peer_id) do
- [{ ^peer_id, pid, _, _}] when pid != nil->
- GenServer.cast(pid, {:send_msg, msg})
- _ ->
- GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
- end
+ def incoming(conn_pid, {:not_interested, shard}) do
+ GenServer.cast(__MODULE__, {:not_interested, peer_id, shard})
end
@doc"""
Dispatch incoming message to correct shard process
"""
- def dispatch(peer_id, {shard_id, path, msg}) do
+ defp dispatch(conn_pid, {shard_id, path, msg}) do
+ # TODO: auth
case :dets.lookup(@shard_db, shard_id) do
[] ->
__MODULE__.send(peer_id, {:not_interested, shard_id})
@@ -255,12 +241,33 @@ defmodule Shard.Manager do
end
end
- def dispatch(peer_id, {:interested, shards}) do
- GenServer.cast(__MODULE__, {:interested, peer_id, shards})
+
+ # =====================
+ # INTERFACE WITH SHARDS
+ # =====================
+
+ @doc"""
+ Send message to a peer specified by peer id
+ """
+ def send(peer_id, msg) do
+ case :dets.lookup(@peer_db, peer_id) do
+ [{ ^peer_id, pid, _, _}] when pid != nil->
+ GenServer.cast(pid, {:send_msg, msg})
+ _ ->
+ GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
+ end
end
- def dispatch(peer_id, {:not_interested, shard}) do
- GenServer.cast(__MODULE__, {:not_interested, peer_id, shard})
+ @doc"""
+ Send message to a peer through an authenticated channel
+
+ his_auth: accepted users to talk to, either single pk or list of pk
+
+ Returns true if a corresponding channel was open and msg was sent,
+ false otherwise.
+ """
+ def send(peer_id, my_auth, his_auth, msg) do
+ # TODO
end
@doc"""
@@ -273,16 +280,6 @@ defmodule Shard.Manager do
end
@doc"""
- Returns the pid for a shard if it exists
- """
- def find_proc(shard_id) do
- case :dets.lookup(@shard_db, shard_id) do
- [{^shard_id, _, pid}] -> pid
- _ -> nil
- end
- end
-
- @doc"""
Register a process as the handler for shard packets for a given path.
"""
def dispatch_to(shard_id, path, pid) do
@@ -290,20 +287,6 @@ defmodule Shard.Manager do
end
@doc"""
- Return the list of all shards.
- """
- def list_shards() do
- for [x] <- :dets.match(@shard_db, :"$1"), do: x
- end
-
- @doc"""
- Return the list of all peers
- """
- def list_peers() do
- for [x] <- :dets.match(@peer_db, :"$1"), do: x
- end
-
- @doc"""
Return the list of all peer IDs that are interested in a certain shard
"""
def get_shard_peers(shard_id) do
@@ -326,4 +309,40 @@ defmodule Shard.Manager do
def save_state(shard_id, state) do
:dets.insert(@shard_state, {shard_id, state})
end
+
+
+ # ==========================
+ # INTERFACE FOR OTHER THINGS
+ # ==========================
+
+ @doc"""
+ Connect to a peer specified by ip address and port
+ """
+ def add_peer(ip, port) do
+ GenServer.cast(__MODULE__, {:add_peer, ip, port})
+ end
+
+ @doc"""
+ Returns the pid for a shard if it exists
+ """
+ def find_proc(shard_id) do
+ case :dets.lookup(@shard_db, shard_id) do
+ [{^shard_id, _, pid}] -> pid
+ _ -> nil
+ end
+ end
+
+ @doc"""
+ Return the list of all shards.
+ """
+ def list_shards() do
+ for [x] <- :dets.match(@shard_db, :"$1"), do: x
+ end
+
+ @doc"""
+ Return the list of all peers
+ """
+ def list_peers() do
+ for [x] <- :dets.match(@peer_db, :"$1"), do: x
+ end
end