aboutsummaryrefslogtreecommitdiff
path: root/lib/app/chat.ex
diff options
context:
space:
mode:
Diffstat (limited to 'lib/app/chat.ex')
-rw-r--r--lib/app/chat.ex57
1 files changed, 23 insertions, 34 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
index bc9f5de..85f5265 100644
--- a/lib/app/chat.ex
+++ b/lib/app/chat.ex
@@ -36,18 +36,21 @@ defmodule SApp.Chat do
manifest = {:chat, channel}
id = SData.term_hash manifest
- GenServer.cast(Shard.Manager, {:register, id, manifest, self()})
- GenServer.cast(self(), :init_pull)
-
- {:ok,
- %{channel: channel,
- id: id,
- manifest: manifest,
- store: store,
- peers: MapSet.new,
- subs: MapSet.new,
- }
- }
+ case Shard.Manager.register(id, manifest, self()) do
+ :ok ->
+ Shard.Manager.dispatch_to(id, nil, self())
+ GenServer.cast(self(), :init_pull)
+ {:ok,
+ %{channel: channel,
+ id: id,
+ manifest: manifest,
+ store: store,
+ subs: MapSet.new,
+ }
+ }
+ :redundant ->
+ exit(:normal)
+ end
end
@doc """
@@ -63,14 +66,6 @@ defmodule SApp.Chat do
end
@doc """
- Implementation of the :redundant handler: if another process is already
- synchronizing this channel then we exit.
- """
- def handle_cast({:redundant, _}, _state) do
- exit :normal
- end
-
- @doc """
Implementation of the :init_pull handler, which is called when the
process starts. It contacts all currently connected peers and asks them to
send data for this channel if they have some.
@@ -99,8 +94,8 @@ defmodule SApp.Chat do
end
end
- for peer <- state.peers do
- push_messages(new_state, peer, nil, 5)
+ for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do
+ push_messages(new_state, peer_id, nil, 5)
end
{:noreply, new_state}
@@ -112,8 +107,7 @@ defmodule SApp.Chat do
"""
def handle_cast({:interested, peer_id}, state) do
push_messages(state, peer_id, nil, 10)
- new_peers = MapSet.put(state.peers, peer_id)
- {:noreply, %{ state | peers: new_peers }}
+ {:noreply, state}
end
def handle_cast({:subscribe, pid}, state) do
@@ -131,14 +125,14 @@ defmodule SApp.Chat do
- `{:info, start, list, rest}`: put some messages and informs of the
Merkle hash of the store of older messages.
"""
- def handle_cast({:msg, peer_id, msg}, state) do
+ def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do
case msg do
{:get_manifest} ->
- Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}})
+ Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}})
{:get, start} -> push_messages(state, peer_id, start, 20)
{:info, _start, list, rest} ->
if rest != nil and not ML.has(state.store, rest) do
- Shard.Manager.send(peer_id, {state.id, {:get, rest}})
+ Shard.Manager.send(peer_id, {state.id, nil, {:get, rest}})
end
who = self()
spawn_link(fn ->
@@ -147,12 +141,7 @@ defmodule SApp.Chat do
end)
_ -> nil
end
-
- if MapSet.member?(state.peers, peer_id) do
- {:noreply, state}
- else
- handle_cast({:interested, peer_id}, state)
- end
+ {:noreply, state}
end
def handle_cast({:deferred_insert, list}, state) do
@@ -168,7 +157,7 @@ defmodule SApp.Chat do
defp push_messages(state, to, start, num) do
case ML.read(state.store, start, num) do
{:ok, list, rest} ->
- Shard.Manager.send(to, {state.id, {:info, start, list, rest}})
+ Shard.Manager.send(to, {state.id, nil, {:info, start, list, rest}})
_ -> nil
end
end