diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/app/chat.ex | 57 | ||||
-rw-r--r-- | lib/manager.ex | 116 | ||||
-rw-r--r-- | lib/net/tcpconn.ex | 28 |
3 files changed, 128 insertions, 73 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex index bc9f5de..85f5265 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -36,18 +36,21 @@ defmodule SApp.Chat do manifest = {:chat, channel} id = SData.term_hash manifest - GenServer.cast(Shard.Manager, {:register, id, manifest, self()}) - GenServer.cast(self(), :init_pull) - - {:ok, - %{channel: channel, - id: id, - manifest: manifest, - store: store, - peers: MapSet.new, - subs: MapSet.new, - } - } + case Shard.Manager.register(id, manifest, self()) do + :ok -> + Shard.Manager.dispatch_to(id, nil, self()) + GenServer.cast(self(), :init_pull) + {:ok, + %{channel: channel, + id: id, + manifest: manifest, + store: store, + subs: MapSet.new, + } + } + :redundant -> + exit(:normal) + end end @doc """ @@ -63,14 +66,6 @@ defmodule SApp.Chat do end @doc """ - Implementation of the :redundant handler: if another process is already - synchronizing this channel then we exit. - """ - def handle_cast({:redundant, _}, _state) do - exit :normal - end - - @doc """ Implementation of the :init_pull handler, which is called when the process starts. It contacts all currently connected peers and asks them to send data for this channel if they have some. @@ -99,8 +94,8 @@ defmodule SApp.Chat do end end - for peer <- state.peers do - push_messages(new_state, peer, nil, 5) + for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do + push_messages(new_state, peer_id, nil, 5) end {:noreply, new_state} @@ -112,8 +107,7 @@ defmodule SApp.Chat do """ def handle_cast({:interested, peer_id}, state) do push_messages(state, peer_id, nil, 10) - new_peers = MapSet.put(state.peers, peer_id) - {:noreply, %{ state | peers: new_peers }} + {:noreply, state} end def handle_cast({:subscribe, pid}, state) do @@ -131,14 +125,14 @@ defmodule SApp.Chat do - `{:info, start, list, rest}`: put some messages and informs of the Merkle hash of the store of older messages. """ - def handle_cast({:msg, peer_id, msg}, state) do + def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do case msg do {:get_manifest} -> - Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}}) + Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}}) {:get, start} -> push_messages(state, peer_id, start, 20) {:info, _start, list, rest} -> if rest != nil and not ML.has(state.store, rest) do - Shard.Manager.send(peer_id, {state.id, {:get, rest}}) + Shard.Manager.send(peer_id, {state.id, nil, {:get, rest}}) end who = self() spawn_link(fn -> @@ -147,12 +141,7 @@ defmodule SApp.Chat do end) _ -> nil end - - if MapSet.member?(state.peers, peer_id) do - {:noreply, state} - else - handle_cast({:interested, peer_id}, state) - end + {:noreply, state} end def handle_cast({:deferred_insert, list}, state) do @@ -168,7 +157,7 @@ defmodule SApp.Chat do defp push_messages(state, to, start, num) do case ML.read(state.store, start, num) do {:ok, list, rest} -> - Shard.Manager.send(to, {state.id, {:info, start, list, rest}}) + Shard.Manager.send(to, {state.id, nil, {:info, start, list, rest}}) _ -> nil end end diff --git a/lib/manager.ex b/lib/manager.ex index 87f95c5..b547312 100644 --- a/lib/manager.ex +++ b/lib/manager.ex @@ -1,6 +1,6 @@ defmodule Shard.Manager do @moduledoc""" - Maintains two important tables : + Maintains several important tables : - :peer_db @@ -10,14 +10,20 @@ defmodule Shard.Manager do - :shard_db List of - { id, manifest, shard_pid } + { id, manifest, pid | nil } - And also some others : + - :shard_procs - - :peer_shard_db + List of + { {id, path}, pid } + + - :shard_peer_db Mult-list of - { peer_id, shard_id } + { shard_id, peer_id } + + + And an internal table : - :outbox @@ -37,10 +43,11 @@ defmodule Shard.Manager do def init(my_port) do :ets.new(:peer_db, [:set, :protected, :named_table]) :ets.new(:shard_db, [:set, :protected, :named_table]) - peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private]) + :ets.new(:shard_procs, [:set, :protected, :named_table]) + :ets.new(:shard_peer_db, [:bag, :protected, :named_table]) outbox = :ets.new(:outbox, [:bag, :private]) - {:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} } + {:ok, %{my_port: my_port, outbox: outbox} } end def handle_call({:find, shard_id}, _from, state) do @@ -51,16 +58,24 @@ defmodule Shard.Manager do {:reply, reply, state} end - def handle_cast({:register, shard_id, manifest, pid}, state) do + def handle_call({:register, shard_id, manifest, pid}, _from, state) do will_live = case :ets.lookup(:shard_db, shard_id) do [{ ^shard_id, _, pid }] -> not Process.alive?(pid) _ -> true end - if will_live do + reply = if will_live do + Process.monitor(pid) :ets.insert(:shard_db, {shard_id, manifest, pid}) + :ok else - GenServer.cast(pid, {:redundant, shard_id}) + :redundant end + {:reply, reply, state} + end + + def handle_cast({:dispatch_to, shard_id, path, pid}, state) do + :ets.insert(:shard_procs, { {shard_id, path}, pid }) + Process.monitor(pid) {:noreply, state} end @@ -68,7 +83,7 @@ defmodule Shard.Manager do for shard_id <- shards do case :ets.lookup(:shard_db, shard_id) do [{ ^shard_id, _, pid }] -> - :ets.insert(state.peer_shard_db, {peer_id, shard_id}) + :ets.insert(:shard_peer_db, {shard_id, peer_id}) GenServer.cast(pid, {:interested, peer_id}) [] -> nil end @@ -76,14 +91,29 @@ defmodule Shard.Manager do {:noreply, state} end + def handle_cast({:not_interested, peer_id, shard_id}, state) do + :ets.match_delete(:shard_peer_db, {shard_id, peer_id}) + {:noreply, state} + end + def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do + :ets.insert(:shard_peer_db, {shard_id, peer_id}) + {:noreply, state} + end def handle_cast({:peer_up, pk, pid, ip, port}, state) do :ets.insert(:peer_db, {pk, pid, ip, port}) + + # Send interested message for all our shards + id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id) + GenServer.cast(pid, {:send_msg, {:interested, id_list}}) + + # Send queued messages for {_, msg, _} <- :ets.lookup(state.outbox, pk) do GenServer.cast(pid, {:send_msg, msg}) end :ets.delete(state.outbox, pk) + {:noreply, state} end @@ -123,6 +153,11 @@ defmodule Shard.Manager do {:noreply, state} end + def handle_info({:DOWN, _, :process, pid, _}, state) do + :ets.match_delete(:shard_procs, {:_, pid}) + {:noreply, state} + end + defp add_peer(ip, port, state) do spawn fn -> case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do @@ -135,10 +170,22 @@ defmodule Shard.Manager do end end + + # ================ + # PUBLIC INTERFACE + # ================ + + + @doc""" + Connect to a peer specified by ip address and port + """ def add_peer(ip, port) do GenServer.cast(__MODULE__, {:add_peer, ip, port}) end + @doc""" + Send message to a peer specified by peer id + """ def send(peer_id, msg) do case :ets.lookup(:peer_db, peer_id) do [{ ^peer_id, pid, _, _}] when pid != nil-> @@ -148,12 +195,49 @@ defmodule Shard.Manager do end end - def dispatch(peer_id, shard_id, msg) do + @doc""" + Dispatch incoming message to correct shard process + """ + def dispatch(peer_id, {shard_id, path, msg}) do case :ets.lookup(:shard_db, shard_id) do - [{ ^shard_id, _, pid }] when pid != nil -> - GenServer.cast(pid, {:msg, peer_id, msg}) - [_] -> nil # TODO restart shard - [] -> nil # TODO send not interested + [] -> + __MODULE__.send(peer_id, {:not_interested, shard_id}) + [_] -> + case :ets.match(:shard_peer_db, {shard_id, peer_id}) do + [] -> + GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id}) + _ -> nil + end + case :ets.lookup(:shard_procs, {shard_id, path}) do + [{ {^shard_id, ^path}, pid }] -> + GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg}) + [] -> + Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}") + end end end + + def dispatch(peer_id, {:interested, shards}) do + GenServer.cast(__MODULE__, {:interested, peer_id, shards}) + end + + def dispatch(peer_id, {:not_interested, shard}) do + GenServer.cast(__MODULE__, {:not_interested, peer_id, shard}) + end + + @doc""" + Register a process as the main process for a shard. + + Returns either :ok or :redundant, in which case the process must exit. + """ + def register(shard_id, manifest, pid) do + GenServer.call(__MODULE__, {:register, shard_id, manifest, pid}) + end + + @doc""" + Register a process as the handler for shard packets for a given path. + """ + def dispatch_to(shard_id, path, pid) do + GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) + end end diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex index 7d82601..6fc4b1a 100644 --- a/lib/net/tcpconn.ex +++ b/lib/net/tcpconn.ex @@ -56,19 +56,14 @@ defmodule SNet.TCPConn do } GenServer.cast(Shard.Manager, {:peer_up, cli_pkey, self(), addr, his_port}) Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" - GenServer.cast(self(), :init_pull) {:noreply, state} end def handle_cast({:send_msg, msg}, state) do - send_msg(state, msg) - {:noreply, state} - end - - def handle_cast(:init_pull, state) do - id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id) - send_msg(state, {:interested, id_list}) + msgbin = :erlang.term_to_binary msg + enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey) + :gen_tcp.send(state.socket, enc) {:noreply, state} end @@ -85,15 +80,10 @@ defmodule SNet.TCPConn do msg end - defp send_msg(state, msg) do - msgbin = :erlang.term_to_binary msg - enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey) - :gen_tcp.send(state.socket, enc) - end - def handle_info({:tcp, _socket, raw_data}, state) do msg = decode_pkt(raw_data, state.conn_his_pkey, state.conn_my_skey) - handle_packet(:erlang.binary_to_term(msg, [:safe]), state) + msg_data = :erlang.binary_to_term(msg, [:safe]) + Shard.Manager.dispatch(state.his_pkey, msg_data) {:noreply, state} end @@ -103,14 +93,6 @@ defmodule SNet.TCPConn do exit(:normal) end - defp handle_packet({:interested, shards}, state) do - GenServer.cast(Shard.Manager, {:interested, state.his_pkey, shards}) - end - - defp handle_packet({shard, msg}, state) do - Shard.Manager.dispatch(state.his_pkey, shard, msg) - end - defp print_id(state) do state.his_pkey |> binary_part(0, 8) |