aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/app/chat.ex57
-rw-r--r--lib/manager.ex116
-rw-r--r--lib/net/tcpconn.ex28
3 files changed, 128 insertions, 73 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
index bc9f5de..85f5265 100644
--- a/lib/app/chat.ex
+++ b/lib/app/chat.ex
@@ -36,18 +36,21 @@ defmodule SApp.Chat do
manifest = {:chat, channel}
id = SData.term_hash manifest
- GenServer.cast(Shard.Manager, {:register, id, manifest, self()})
- GenServer.cast(self(), :init_pull)
-
- {:ok,
- %{channel: channel,
- id: id,
- manifest: manifest,
- store: store,
- peers: MapSet.new,
- subs: MapSet.new,
- }
- }
+ case Shard.Manager.register(id, manifest, self()) do
+ :ok ->
+ Shard.Manager.dispatch_to(id, nil, self())
+ GenServer.cast(self(), :init_pull)
+ {:ok,
+ %{channel: channel,
+ id: id,
+ manifest: manifest,
+ store: store,
+ subs: MapSet.new,
+ }
+ }
+ :redundant ->
+ exit(:normal)
+ end
end
@doc """
@@ -63,14 +66,6 @@ defmodule SApp.Chat do
end
@doc """
- Implementation of the :redundant handler: if another process is already
- synchronizing this channel then we exit.
- """
- def handle_cast({:redundant, _}, _state) do
- exit :normal
- end
-
- @doc """
Implementation of the :init_pull handler, which is called when the
process starts. It contacts all currently connected peers and asks them to
send data for this channel if they have some.
@@ -99,8 +94,8 @@ defmodule SApp.Chat do
end
end
- for peer <- state.peers do
- push_messages(new_state, peer, nil, 5)
+ for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do
+ push_messages(new_state, peer_id, nil, 5)
end
{:noreply, new_state}
@@ -112,8 +107,7 @@ defmodule SApp.Chat do
"""
def handle_cast({:interested, peer_id}, state) do
push_messages(state, peer_id, nil, 10)
- new_peers = MapSet.put(state.peers, peer_id)
- {:noreply, %{ state | peers: new_peers }}
+ {:noreply, state}
end
def handle_cast({:subscribe, pid}, state) do
@@ -131,14 +125,14 @@ defmodule SApp.Chat do
- `{:info, start, list, rest}`: put some messages and informs of the
Merkle hash of the store of older messages.
"""
- def handle_cast({:msg, peer_id, msg}, state) do
+ def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do
case msg do
{:get_manifest} ->
- Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}})
+ Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}})
{:get, start} -> push_messages(state, peer_id, start, 20)
{:info, _start, list, rest} ->
if rest != nil and not ML.has(state.store, rest) do
- Shard.Manager.send(peer_id, {state.id, {:get, rest}})
+ Shard.Manager.send(peer_id, {state.id, nil, {:get, rest}})
end
who = self()
spawn_link(fn ->
@@ -147,12 +141,7 @@ defmodule SApp.Chat do
end)
_ -> nil
end
-
- if MapSet.member?(state.peers, peer_id) do
- {:noreply, state}
- else
- handle_cast({:interested, peer_id}, state)
- end
+ {:noreply, state}
end
def handle_cast({:deferred_insert, list}, state) do
@@ -168,7 +157,7 @@ defmodule SApp.Chat do
defp push_messages(state, to, start, num) do
case ML.read(state.store, start, num) do
{:ok, list, rest} ->
- Shard.Manager.send(to, {state.id, {:info, start, list, rest}})
+ Shard.Manager.send(to, {state.id, nil, {:info, start, list, rest}})
_ -> nil
end
end
diff --git a/lib/manager.ex b/lib/manager.ex
index 87f95c5..b547312 100644
--- a/lib/manager.ex
+++ b/lib/manager.ex
@@ -1,6 +1,6 @@
defmodule Shard.Manager do
@moduledoc"""
- Maintains two important tables :
+ Maintains several important tables :
- :peer_db
@@ -10,14 +10,20 @@ defmodule Shard.Manager do
- :shard_db
List of
- { id, manifest, shard_pid }
+ { id, manifest, pid | nil }
- And also some others :
+ - :shard_procs
- - :peer_shard_db
+ List of
+ { {id, path}, pid }
+
+ - :shard_peer_db
Mult-list of
- { peer_id, shard_id }
+ { shard_id, peer_id }
+
+
+ And an internal table :
- :outbox
@@ -37,10 +43,11 @@ defmodule Shard.Manager do
def init(my_port) do
:ets.new(:peer_db, [:set, :protected, :named_table])
:ets.new(:shard_db, [:set, :protected, :named_table])
- peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private])
+ :ets.new(:shard_procs, [:set, :protected, :named_table])
+ :ets.new(:shard_peer_db, [:bag, :protected, :named_table])
outbox = :ets.new(:outbox, [:bag, :private])
- {:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} }
+ {:ok, %{my_port: my_port, outbox: outbox} }
end
def handle_call({:find, shard_id}, _from, state) do
@@ -51,16 +58,24 @@ defmodule Shard.Manager do
{:reply, reply, state}
end
- def handle_cast({:register, shard_id, manifest, pid}, state) do
+ def handle_call({:register, shard_id, manifest, pid}, _from, state) do
will_live = case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] -> not Process.alive?(pid)
_ -> true
end
- if will_live do
+ reply = if will_live do
+ Process.monitor(pid)
:ets.insert(:shard_db, {shard_id, manifest, pid})
+ :ok
else
- GenServer.cast(pid, {:redundant, shard_id})
+ :redundant
end
+ {:reply, reply, state}
+ end
+
+ def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
+ :ets.insert(:shard_procs, { {shard_id, path}, pid })
+ Process.monitor(pid)
{:noreply, state}
end
@@ -68,7 +83,7 @@ defmodule Shard.Manager do
for shard_id <- shards do
case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] ->
- :ets.insert(state.peer_shard_db, {peer_id, shard_id})
+ :ets.insert(:shard_peer_db, {shard_id, peer_id})
GenServer.cast(pid, {:interested, peer_id})
[] -> nil
end
@@ -76,14 +91,29 @@ defmodule Shard.Manager do
{:noreply, state}
end
+ def handle_cast({:not_interested, peer_id, shard_id}, state) do
+ :ets.match_delete(:shard_peer_db, {shard_id, peer_id})
+ {:noreply, state}
+ end
+ def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do
+ :ets.insert(:shard_peer_db, {shard_id, peer_id})
+ {:noreply, state}
+ end
def handle_cast({:peer_up, pk, pid, ip, port}, state) do
:ets.insert(:peer_db, {pk, pid, ip, port})
+
+ # Send interested message for all our shards
+ id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id)
+ GenServer.cast(pid, {:send_msg, {:interested, id_list}})
+
+ # Send queued messages
for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
GenServer.cast(pid, {:send_msg, msg})
end
:ets.delete(state.outbox, pk)
+
{:noreply, state}
end
@@ -123,6 +153,11 @@ defmodule Shard.Manager do
{:noreply, state}
end
+ def handle_info({:DOWN, _, :process, pid, _}, state) do
+ :ets.match_delete(:shard_procs, {:_, pid})
+ {:noreply, state}
+ end
+
defp add_peer(ip, port, state) do
spawn fn ->
case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
@@ -135,10 +170,22 @@ defmodule Shard.Manager do
end
end
+
+ # ================
+ # PUBLIC INTERFACE
+ # ================
+
+
+ @doc"""
+ Connect to a peer specified by ip address and port
+ """
def add_peer(ip, port) do
GenServer.cast(__MODULE__, {:add_peer, ip, port})
end
+ @doc"""
+ Send message to a peer specified by peer id
+ """
def send(peer_id, msg) do
case :ets.lookup(:peer_db, peer_id) do
[{ ^peer_id, pid, _, _}] when pid != nil->
@@ -148,12 +195,49 @@ defmodule Shard.Manager do
end
end
- def dispatch(peer_id, shard_id, msg) do
+ @doc"""
+ Dispatch incoming message to correct shard process
+ """
+ def dispatch(peer_id, {shard_id, path, msg}) do
case :ets.lookup(:shard_db, shard_id) do
- [{ ^shard_id, _, pid }] when pid != nil ->
- GenServer.cast(pid, {:msg, peer_id, msg})
- [_] -> nil # TODO restart shard
- [] -> nil # TODO send not interested
+ [] ->
+ __MODULE__.send(peer_id, {:not_interested, shard_id})
+ [_] ->
+ case :ets.match(:shard_peer_db, {shard_id, peer_id}) do
+ [] ->
+ GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id})
+ _ -> nil
+ end
+ case :ets.lookup(:shard_procs, {shard_id, path}) do
+ [{ {^shard_id, ^path}, pid }] ->
+ GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg})
+ [] ->
+ Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}")
+ end
end
end
+
+ def dispatch(peer_id, {:interested, shards}) do
+ GenServer.cast(__MODULE__, {:interested, peer_id, shards})
+ end
+
+ def dispatch(peer_id, {:not_interested, shard}) do
+ GenServer.cast(__MODULE__, {:not_interested, peer_id, shard})
+ end
+
+ @doc"""
+ Register a process as the main process for a shard.
+
+ Returns either :ok or :redundant, in which case the process must exit.
+ """
+ def register(shard_id, manifest, pid) do
+ GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
+ end
+
+ @doc"""
+ Register a process as the handler for shard packets for a given path.
+ """
+ def dispatch_to(shard_id, path, pid) do
+ GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
+ end
end
diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex
index 7d82601..6fc4b1a 100644
--- a/lib/net/tcpconn.ex
+++ b/lib/net/tcpconn.ex
@@ -56,19 +56,14 @@ defmodule SNet.TCPConn do
}
GenServer.cast(Shard.Manager, {:peer_up, cli_pkey, self(), addr, his_port})
Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
- GenServer.cast(self(), :init_pull)
{:noreply, state}
end
def handle_cast({:send_msg, msg}, state) do
- send_msg(state, msg)
- {:noreply, state}
- end
-
- def handle_cast(:init_pull, state) do
- id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id)
- send_msg(state, {:interested, id_list})
+ msgbin = :erlang.term_to_binary msg
+ enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey)
+ :gen_tcp.send(state.socket, enc)
{:noreply, state}
end
@@ -85,15 +80,10 @@ defmodule SNet.TCPConn do
msg
end
- defp send_msg(state, msg) do
- msgbin = :erlang.term_to_binary msg
- enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey)
- :gen_tcp.send(state.socket, enc)
- end
-
def handle_info({:tcp, _socket, raw_data}, state) do
msg = decode_pkt(raw_data, state.conn_his_pkey, state.conn_my_skey)
- handle_packet(:erlang.binary_to_term(msg, [:safe]), state)
+ msg_data = :erlang.binary_to_term(msg, [:safe])
+ Shard.Manager.dispatch(state.his_pkey, msg_data)
{:noreply, state}
end
@@ -103,14 +93,6 @@ defmodule SNet.TCPConn do
exit(:normal)
end
- defp handle_packet({:interested, shards}, state) do
- GenServer.cast(Shard.Manager, {:interested, state.his_pkey, shards})
- end
-
- defp handle_packet({shard, msg}, state) do
- Shard.Manager.dispatch(state.his_pkey, shard, msg)
- end
-
defp print_id(state) do
state.his_pkey
|> binary_part(0, 8)