From 981f15fe87bf268a55e5caabbc2f4e4d81cae873 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 31 Aug 2018 17:51:30 +0200 Subject: Some refactoring, more functionality for the manager --- lib/app/chat.ex | 57 +++++++++++++++++++++++---------------------------------- 1 file changed, 23 insertions(+), 34 deletions(-) (limited to 'lib/app/chat.ex') diff --git a/lib/app/chat.ex b/lib/app/chat.ex index bc9f5de..85f5265 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -36,18 +36,21 @@ defmodule SApp.Chat do manifest = {:chat, channel} id = SData.term_hash manifest - GenServer.cast(Shard.Manager, {:register, id, manifest, self()}) - GenServer.cast(self(), :init_pull) - - {:ok, - %{channel: channel, - id: id, - manifest: manifest, - store: store, - peers: MapSet.new, - subs: MapSet.new, - } - } + case Shard.Manager.register(id, manifest, self()) do + :ok -> + Shard.Manager.dispatch_to(id, nil, self()) + GenServer.cast(self(), :init_pull) + {:ok, + %{channel: channel, + id: id, + manifest: manifest, + store: store, + subs: MapSet.new, + } + } + :redundant -> + exit(:normal) + end end @doc """ @@ -62,14 +65,6 @@ defmodule SApp.Chat do {:reply, ret, state} end - @doc """ - Implementation of the :redundant handler: if another process is already - synchronizing this channel then we exit. - """ - def handle_cast({:redundant, _}, _state) do - exit :normal - end - @doc """ Implementation of the :init_pull handler, which is called when the process starts. It contacts all currently connected peers and asks them to @@ -99,8 +94,8 @@ defmodule SApp.Chat do end end - for peer <- state.peers do - push_messages(new_state, peer, nil, 5) + for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do + push_messages(new_state, peer_id, nil, 5) end {:noreply, new_state} @@ -112,8 +107,7 @@ defmodule SApp.Chat do """ def handle_cast({:interested, peer_id}, state) do push_messages(state, peer_id, nil, 10) - new_peers = MapSet.put(state.peers, peer_id) - {:noreply, %{ state | peers: new_peers }} + {:noreply, state} end def handle_cast({:subscribe, pid}, state) do @@ -131,14 +125,14 @@ defmodule SApp.Chat do - `{:info, start, list, rest}`: put some messages and informs of the Merkle hash of the store of older messages. """ - def handle_cast({:msg, peer_id, msg}, state) do + def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do case msg do {:get_manifest} -> - Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}}) + Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}}) {:get, start} -> push_messages(state, peer_id, start, 20) {:info, _start, list, rest} -> if rest != nil and not ML.has(state.store, rest) do - Shard.Manager.send(peer_id, {state.id, {:get, rest}}) + Shard.Manager.send(peer_id, {state.id, nil, {:get, rest}}) end who = self() spawn_link(fn -> @@ -147,12 +141,7 @@ defmodule SApp.Chat do end) _ -> nil end - - if MapSet.member?(state.peers, peer_id) do - {:noreply, state} - else - handle_cast({:interested, peer_id}, state) - end + {:noreply, state} end def handle_cast({:deferred_insert, list}, state) do @@ -168,7 +157,7 @@ defmodule SApp.Chat do defp push_messages(state, to, start, num) do case ML.read(state.store, start, num) do {:ok, list, rest} -> - Shard.Manager.send(to, {state.id, {:info, start, list, rest}}) + Shard.Manager.send(to, {state.id, nil, {:info, start, list, rest}}) _ -> nil end end -- cgit v1.2.3