diff options
Diffstat (limited to 'shard/lib/manager.ex')
-rw-r--r-- | shard/lib/manager.ex | 52 |
1 files changed, 28 insertions, 24 deletions
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index dd602bf..3f5416d 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -51,10 +51,11 @@ defmodule Shard.Manager do - :outbox (not persistent) Multi-list of - { dest, auth_info, message, time_inserted } + { dest_peer_info, message, time_inserted } dest := peer_info - auth_info := nil | { his_pk, my_pk_list } + + No support for messages on authenticated channels """ @@ -137,11 +138,11 @@ defmodule Shard.Manager do id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id) GenServer.cast(pid, {:send_msg, {:interested, id_list}}) - # # Send queued messages - # for {_, msg, _} <- :ets.lookup(state.outbox, pk) do - # GenServer.cast(pid, {:send_msg, msg}) - # end - # :ets.delete(state.outbox, pk) + # Send queued messages + for {_, msg, _} <- :ets.lookup(state.outbox, peer_info) do + GenServer.cast(pid, {:send_msg, msg}) + end + :ets.delete(state.outbox, peer_info) {:noreply, state} end @@ -151,19 +152,17 @@ defmodule Shard.Manager do {:noreply, state} end - # def handle_cast({:connect_and_send, peer_id, msg}, state) do - # case :dets.lookup(@peer_db, peer_id) do - # [{^peer_id, nil, ip, port}] -> - # add_peer(ip, port, state) - # currtime = System.os_time :second - # :ets.insert(state.outbox, {peer_id, msg, currtime}) - # outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ] - # :ets.select_delete(state.outbox, outbox_cleanup) - # _ -> - # Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database" - # end - # {:noreply, state} - # end + def handle_cast({:connect_and_send, peer_info, msg}, state) do + case peer_info do + {:tcp4, ip, port} -> + add_peer(ip, port, state) + currtime = System.os_time :second + :ets.insert(state.outbox, {peer_info, msg, currtime}) + outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ] + :ets.select_delete(state.outbox, outbox_cleanup) + end + {:noreply, state} + end def handle_cast({:try_connect, pk_list}, state) do for pk <- pk_list do @@ -249,8 +248,13 @@ defmodule Shard.Manager do Send message to a peer specified by peer info. Opens a connection if necessary. """ - def send(_peer_info, _msg) do - # TODO + def send(peer_info, msg) do + case :ets.lookup(:connections, peer_info) do + [{^peer_info, pid, _auth}|_] -> + GenServer.cast(pid, {:send_msg, msg}) + [] -> + GenServer.cast(__MODULE__, {:connect_and_send, peer_info, msg}) + end end @doc""" @@ -273,7 +277,7 @@ defmodule Shard.Manager do Return the list of all peer info for peers that are interested in a certain shard """ def get_shard_peers(shard_id) do - for [x] <- :dets.match(@peer_db, {shard_id, :"$1"}), do: x + for {_, peer_info} <- :dets.lookup(@peer_db, shard_id), do: peer_info end @doc""" @@ -326,6 +330,6 @@ defmodule Shard.Manager do Return the list of all connected peers """ def list_connections() do - for [x] <- :dets.match(:connections, :"$1"), do: x + for [x] <- :ets.match(:connections, :"$1"), do: x end end |