aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/manager.ex
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r--shard/lib/manager.ex52
1 files changed, 28 insertions, 24 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index dd602bf..3f5416d 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -51,10 +51,11 @@ defmodule Shard.Manager do
- :outbox (not persistent)
Multi-list of
- { dest, auth_info, message, time_inserted }
+ { dest_peer_info, message, time_inserted }
dest := peer_info
- auth_info := nil | { his_pk, my_pk_list }
+
+ No support for messages on authenticated channels
"""
@@ -137,11 +138,11 @@ defmodule Shard.Manager do
id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id)
GenServer.cast(pid, {:send_msg, {:interested, id_list}})
- # # Send queued messages
- # for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
- # GenServer.cast(pid, {:send_msg, msg})
- # end
- # :ets.delete(state.outbox, pk)
+ # Send queued messages
+ for {_, msg, _} <- :ets.lookup(state.outbox, peer_info) do
+ GenServer.cast(pid, {:send_msg, msg})
+ end
+ :ets.delete(state.outbox, peer_info)
{:noreply, state}
end
@@ -151,19 +152,17 @@ defmodule Shard.Manager do
{:noreply, state}
end
- # def handle_cast({:connect_and_send, peer_id, msg}, state) do
- # case :dets.lookup(@peer_db, peer_id) do
- # [{^peer_id, nil, ip, port}] ->
- # add_peer(ip, port, state)
- # currtime = System.os_time :second
- # :ets.insert(state.outbox, {peer_id, msg, currtime})
- # outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ]
- # :ets.select_delete(state.outbox, outbox_cleanup)
- # _ ->
- # Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database"
- # end
- # {:noreply, state}
- # end
+ def handle_cast({:connect_and_send, peer_info, msg}, state) do
+ case peer_info do
+ {:tcp4, ip, port} ->
+ add_peer(ip, port, state)
+ currtime = System.os_time :second
+ :ets.insert(state.outbox, {peer_info, msg, currtime})
+ outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ]
+ :ets.select_delete(state.outbox, outbox_cleanup)
+ end
+ {:noreply, state}
+ end
def handle_cast({:try_connect, pk_list}, state) do
for pk <- pk_list do
@@ -249,8 +248,13 @@ defmodule Shard.Manager do
Send message to a peer specified by peer info.
Opens a connection if necessary.
"""
- def send(_peer_info, _msg) do
- # TODO
+ def send(peer_info, msg) do
+ case :ets.lookup(:connections, peer_info) do
+ [{^peer_info, pid, _auth}|_] ->
+ GenServer.cast(pid, {:send_msg, msg})
+ [] ->
+ GenServer.cast(__MODULE__, {:connect_and_send, peer_info, msg})
+ end
end
@doc"""
@@ -273,7 +277,7 @@ defmodule Shard.Manager do
Return the list of all peer info for peers that are interested in a certain shard
"""
def get_shard_peers(shard_id) do
- for [x] <- :dets.match(@peer_db, {shard_id, :"$1"}), do: x
+ for {_, peer_info} <- :dets.lookup(@peer_db, shard_id), do: peer_info
end
@doc"""
@@ -326,6 +330,6 @@ defmodule Shard.Manager do
Return the list of all connected peers
"""
def list_connections() do
- for [x] <- :dets.match(:connections, :"$1"), do: x
+ for [x] <- :ets.match(:connections, :"$1"), do: x
end
end