diff options
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r-- | shard/lib/manager.ex | 139 |
1 files changed, 61 insertions, 78 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index ccb750f..dd602bf 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -44,7 +44,7 @@ defmodule Shard.Manager do - :connections (not persistent) List of - { nil | his_pk, nil | my_pk, pid, peer_info } + { peer_info, pid, nil | {my_pk, his_pk} } And an internal table : @@ -108,69 +108,62 @@ defmodule Shard.Manager do {:noreply, state} end - def handle_cast({:interested, peer_info, shards}, state) do + def handle_cast({:interested, conn_pid, peer_info, auth, shards}, state) do for shard_id <- shards do case :dets.lookup(@shard_db, shard_id) do [{ ^shard_id, _, pid }] -> :dets.insert(@peer_db, {shard_id, peer_info}) - GenServer.cast(pid, {:interested, peer_info}) + GenServer.cast(pid, {:interested, conn_pid, auth}) [] -> nil end end {:noreply, state} end - def handle_cast({:not_interested, peer_id, shard_id}, state) do - :dets.match_delete(@shard_peer_db, {shard_id, peer_id}) + def handle_cast({:not_interested, peer_info, shard_id}, state) do + :dets.match_delete(@peer_db, {shard_id, peer_info}) {:noreply, state} end - def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do - :dets.insert(@shard_peer_db, {shard_id, peer_id}) + def handle_cast({:shard_peer_db_insert, shard_id, peer_info}, state) do + :dets.insert(@peer_db, {shard_id, peer_info}) {:noreply, state} end - def handle_cast({:peer_up, pk, pid, ip, port}, state) do - for [pk2] <- :dets.match(@peer_db, {:'$1', :_, ip, port}) do - if pk2 != pk do - # obsolete peer information - :dets.delete(@peer_db, pk2) - :dets.match_delete(@shard_peer_db, {:_, pk2}) - end - end - :dets.insert(@peer_db, {pk, pid, ip, port}) + def handle_cast({:peer_up, pid, peer_info, auth}, state) do + :ets.insert(:connections, {peer_info, pid, auth}) # Send interested message for all our shards id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id) GenServer.cast(pid, {:send_msg, {:interested, id_list}}) - # Send queued messages - for {_, msg, _} <- :ets.lookup(state.outbox, pk) do - GenServer.cast(pid, {:send_msg, msg}) - end - :ets.delete(state.outbox, pk) + # # Send queued messages + # for {_, msg, _} <- :ets.lookup(state.outbox, pk) do + # GenServer.cast(pid, {:send_msg, msg}) + # end + # :ets.delete(state.outbox, pk) {:noreply, state} end - def handle_cast({:peer_down, pk, ip, port}, state) do - :dets.insert(@peer_db, {pk, nil, ip, port}) + def handle_cast({:peer_down, peer_pid, peer_info, auth}, state) do + :ets.match_delete(:connections, {peer_info, peer_pid, auth}) {:noreply, state} end - def handle_cast({:connect_and_send, peer_id, msg}, state) do - case :dets.lookup(@peer_db, peer_id) do - [{^peer_id, nil, ip, port}] -> - add_peer(ip, port, state) - currtime = System.os_time :second - :ets.insert(state.outbox, {peer_id, msg, currtime}) - outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ] - :ets.select_delete(state.outbox, outbox_cleanup) - _ -> - Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database" - end - {:noreply, state} - end + # def handle_cast({:connect_and_send, peer_id, msg}, state) do + # case :dets.lookup(@peer_db, peer_id) do + # [{^peer_id, nil, ip, port}] -> + # add_peer(ip, port, state) + # currtime = System.os_time :second + # :ets.insert(state.outbox, {peer_id, msg, currtime}) + # outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ] + # :ets.select_delete(state.outbox, outbox_cleanup) + # _ -> + # Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database" + # end + # {:noreply, state} + # end def handle_cast({:try_connect, pk_list}, state) do for pk <- pk_list do @@ -197,7 +190,7 @@ defmodule Shard.Manager do spawn fn -> case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do {:ok, client} -> - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port, is_client: true, auth: nil}}) :ok = :gen_tcp.controlling_process(client, pid) _ -> Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" @@ -206,35 +199,34 @@ defmodule Shard.Manager do end - # ==================== - # INTERFACE WITH PEERS - # ==================== + # ====================== + # CALLED BY SNet.TcpConn + # ====================== - def incoming(conn_pid, {:interested, shards}) do - GenServer.cast(__MODULE__, {:interested, peer_id, shards}) + @doc""" + Dispatch incoming message to correct shard process + """ + def incoming(conn_pid, peer_info, auth, {:interested, shards}) do + GenServer.cast(__MODULE__, {:interested, conn_pid, peer_info, auth, shards}) end - def incoming(conn_pid, {:not_interested, shard}) do - GenServer.cast(__MODULE__, {:not_interested, peer_id, shard}) + def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do + GenServer.cast(__MODULE__, {:not_interested, peer_info, shard}) end - @doc""" - Dispatch incoming message to correct shard process - """ - defp dispatch(conn_pid, {shard_id, path, msg}) do - # TODO: auth + def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do case :dets.lookup(@shard_db, shard_id) do [] -> - __MODULE__.send(peer_id, {:not_interested, shard_id}) + GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}}) [_] -> - case :dets.match(@shard_peer_db, {shard_id, peer_id}) do + case :dets.match(@peer_db, {shard_id, peer_info}) do [] -> - GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id}) + GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_info}) _ -> nil end case :ets.lookup(:shard_procs, {shard_id, path}) do [{ {^shard_id, ^path}, pid }] -> - GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg}) + GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg}) [] -> Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}") end @@ -242,31 +234,22 @@ defmodule Shard.Manager do end - # ===================== - # INTERFACE WITH SHARDS - # ===================== + # ================ + # CALLED BY Sapp.* + # ================ @doc""" Send message to a peer specified by peer id """ - def send(peer_id, msg) do - case :dets.lookup(@peer_db, peer_id) do - [{ ^peer_id, pid, _, _}] when pid != nil-> - GenServer.cast(pid, {:send_msg, msg}) - _ -> - GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) - end + def send_pid(pid, msg) do + GenServer.cast(pid, {:send_msg, msg}) end @doc""" - Send message to a peer through an authenticated channel - - his_auth: accepted users to talk to, either single pk or list of pk - - Returns true if a corresponding channel was open and msg was sent, - false otherwise. + Send message to a peer specified by peer info. + Opens a connection if necessary. """ - def send(peer_id, my_auth, his_auth, msg) do + def send(_peer_info, _msg) do # TODO end @@ -287,10 +270,10 @@ defmodule Shard.Manager do end @doc""" - Return the list of all peer IDs that are interested in a certain shard + Return the list of all peer info for peers that are interested in a certain shard """ def get_shard_peers(shard_id) do - for [x] <- :dets.match(@shard_peer_db, {shard_id, :"$1"}), do: x + for [x] <- :dets.match(@peer_db, {shard_id, :"$1"}), do: x end @doc""" @@ -311,9 +294,9 @@ defmodule Shard.Manager do end - # ========================== - # INTERFACE FOR OTHER THINGS - # ========================== + # ================ + # CALLED BY ANYONE + # ================ @doc""" Connect to a peer specified by ip address and port @@ -340,9 +323,9 @@ defmodule Shard.Manager do end @doc""" - Return the list of all peers + Return the list of all connected peers """ - def list_peers() do - for [x] <- :dets.match(@peer_db, :"$1"), do: x + def list_connections() do + for [x] <- :dets.match(:connections, :"$1"), do: x end end |