aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/manager.ex
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r--shard/lib/manager.ex139
1 files changed, 61 insertions, 78 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index ccb750f..dd602bf 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -44,7 +44,7 @@ defmodule Shard.Manager do
- :connections (not persistent)
List of
- { nil | his_pk, nil | my_pk, pid, peer_info }
+ { peer_info, pid, nil | {my_pk, his_pk} }
And an internal table :
@@ -108,69 +108,62 @@ defmodule Shard.Manager do
{:noreply, state}
end
- def handle_cast({:interested, peer_info, shards}, state) do
+ def handle_cast({:interested, conn_pid, peer_info, auth, shards}, state) do
for shard_id <- shards do
case :dets.lookup(@shard_db, shard_id) do
[{ ^shard_id, _, pid }] ->
:dets.insert(@peer_db, {shard_id, peer_info})
- GenServer.cast(pid, {:interested, peer_info})
+ GenServer.cast(pid, {:interested, conn_pid, auth})
[] -> nil
end
end
{:noreply, state}
end
- def handle_cast({:not_interested, peer_id, shard_id}, state) do
- :dets.match_delete(@shard_peer_db, {shard_id, peer_id})
+ def handle_cast({:not_interested, peer_info, shard_id}, state) do
+ :dets.match_delete(@peer_db, {shard_id, peer_info})
{:noreply, state}
end
- def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do
- :dets.insert(@shard_peer_db, {shard_id, peer_id})
+ def handle_cast({:shard_peer_db_insert, shard_id, peer_info}, state) do
+ :dets.insert(@peer_db, {shard_id, peer_info})
{:noreply, state}
end
- def handle_cast({:peer_up, pk, pid, ip, port}, state) do
- for [pk2] <- :dets.match(@peer_db, {:'$1', :_, ip, port}) do
- if pk2 != pk do
- # obsolete peer information
- :dets.delete(@peer_db, pk2)
- :dets.match_delete(@shard_peer_db, {:_, pk2})
- end
- end
- :dets.insert(@peer_db, {pk, pid, ip, port})
+ def handle_cast({:peer_up, pid, peer_info, auth}, state) do
+ :ets.insert(:connections, {peer_info, pid, auth})
# Send interested message for all our shards
id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id)
GenServer.cast(pid, {:send_msg, {:interested, id_list}})
- # Send queued messages
- for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
- GenServer.cast(pid, {:send_msg, msg})
- end
- :ets.delete(state.outbox, pk)
+ # # Send queued messages
+ # for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
+ # GenServer.cast(pid, {:send_msg, msg})
+ # end
+ # :ets.delete(state.outbox, pk)
{:noreply, state}
end
- def handle_cast({:peer_down, pk, ip, port}, state) do
- :dets.insert(@peer_db, {pk, nil, ip, port})
+ def handle_cast({:peer_down, peer_pid, peer_info, auth}, state) do
+ :ets.match_delete(:connections, {peer_info, peer_pid, auth})
{:noreply, state}
end
- def handle_cast({:connect_and_send, peer_id, msg}, state) do
- case :dets.lookup(@peer_db, peer_id) do
- [{^peer_id, nil, ip, port}] ->
- add_peer(ip, port, state)
- currtime = System.os_time :second
- :ets.insert(state.outbox, {peer_id, msg, currtime})
- outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ]
- :ets.select_delete(state.outbox, outbox_cleanup)
- _ ->
- Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database"
- end
- {:noreply, state}
- end
+ # def handle_cast({:connect_and_send, peer_id, msg}, state) do
+ # case :dets.lookup(@peer_db, peer_id) do
+ # [{^peer_id, nil, ip, port}] ->
+ # add_peer(ip, port, state)
+ # currtime = System.os_time :second
+ # :ets.insert(state.outbox, {peer_id, msg, currtime})
+ # outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ]
+ # :ets.select_delete(state.outbox, outbox_cleanup)
+ # _ ->
+ # Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database"
+ # end
+ # {:noreply, state}
+ # end
def handle_cast({:try_connect, pk_list}, state) do
for pk <- pk_list do
@@ -197,7 +190,7 @@ defmodule Shard.Manager do
spawn fn ->
case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
{:ok, client} ->
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port, is_client: true, auth: nil}})
:ok = :gen_tcp.controlling_process(client, pid)
_ ->
Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
@@ -206,35 +199,34 @@ defmodule Shard.Manager do
end
- # ====================
- # INTERFACE WITH PEERS
- # ====================
+ # ======================
+ # CALLED BY SNet.TcpConn
+ # ======================
- def incoming(conn_pid, {:interested, shards}) do
- GenServer.cast(__MODULE__, {:interested, peer_id, shards})
+ @doc"""
+ Dispatch incoming message to correct shard process
+ """
+ def incoming(conn_pid, peer_info, auth, {:interested, shards}) do
+ GenServer.cast(__MODULE__, {:interested, conn_pid, peer_info, auth, shards})
end
- def incoming(conn_pid, {:not_interested, shard}) do
- GenServer.cast(__MODULE__, {:not_interested, peer_id, shard})
+ def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do
+ GenServer.cast(__MODULE__, {:not_interested, peer_info, shard})
end
- @doc"""
- Dispatch incoming message to correct shard process
- """
- defp dispatch(conn_pid, {shard_id, path, msg}) do
- # TODO: auth
+ def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do
case :dets.lookup(@shard_db, shard_id) do
[] ->
- __MODULE__.send(peer_id, {:not_interested, shard_id})
+ GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}})
[_] ->
- case :dets.match(@shard_peer_db, {shard_id, peer_id}) do
+ case :dets.match(@peer_db, {shard_id, peer_info}) do
[] ->
- GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id})
+ GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_info})
_ -> nil
end
case :ets.lookup(:shard_procs, {shard_id, path}) do
[{ {^shard_id, ^path}, pid }] ->
- GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg})
+ GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg})
[] ->
Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}")
end
@@ -242,31 +234,22 @@ defmodule Shard.Manager do
end
- # =====================
- # INTERFACE WITH SHARDS
- # =====================
+ # ================
+ # CALLED BY Sapp.*
+ # ================
@doc"""
Send message to a peer specified by peer id
"""
- def send(peer_id, msg) do
- case :dets.lookup(@peer_db, peer_id) do
- [{ ^peer_id, pid, _, _}] when pid != nil->
- GenServer.cast(pid, {:send_msg, msg})
- _ ->
- GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
- end
+ def send_pid(pid, msg) do
+ GenServer.cast(pid, {:send_msg, msg})
end
@doc"""
- Send message to a peer through an authenticated channel
-
- his_auth: accepted users to talk to, either single pk or list of pk
-
- Returns true if a corresponding channel was open and msg was sent,
- false otherwise.
+ Send message to a peer specified by peer info.
+ Opens a connection if necessary.
"""
- def send(peer_id, my_auth, his_auth, msg) do
+ def send(_peer_info, _msg) do
# TODO
end
@@ -287,10 +270,10 @@ defmodule Shard.Manager do
end
@doc"""
- Return the list of all peer IDs that are interested in a certain shard
+ Return the list of all peer info for peers that are interested in a certain shard
"""
def get_shard_peers(shard_id) do
- for [x] <- :dets.match(@shard_peer_db, {shard_id, :"$1"}), do: x
+ for [x] <- :dets.match(@peer_db, {shard_id, :"$1"}), do: x
end
@doc"""
@@ -311,9 +294,9 @@ defmodule Shard.Manager do
end
- # ==========================
- # INTERFACE FOR OTHER THINGS
- # ==========================
+ # ================
+ # CALLED BY ANYONE
+ # ================
@doc"""
Connect to a peer specified by ip address and port
@@ -340,9 +323,9 @@ defmodule Shard.Manager do
end
@doc"""
- Return the list of all peers
+ Return the list of all connected peers
"""
- def list_peers() do
- for [x] <- :dets.match(@peer_db, :"$1"), do: x
+ def list_connections() do
+ for [x] <- :dets.match(:connections, :"$1"), do: x
end
end