diff options
author | Alex Auvolat <alex@adnab.me> | 2018-10-11 14:22:00 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-10-11 14:22:00 +0200 |
commit | eb8c949551ffb8b3600357d7ff2bebe750af96e5 (patch) | |
tree | 36a0b46219ae595313b569c5885c470a0ddaca0e /shard/lib/app/chat.ex | |
parent | 7b6042205e7c6135fae4e0d21dbf7a5975e8491b (diff) | |
download | shard-eb8c949551ffb8b3600357d7ff2bebe750af96e5.tar.gz shard-eb8c949551ffb8b3600357d7ff2bebe750af96e5.zip |
Address, broadcast group management
Diffstat (limited to 'shard/lib/app/chat.ex')
-rw-r--r-- | shard/lib/app/chat.ex | 123 |
1 files changed, 59 insertions, 64 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 0f4d573..051dfef 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -5,7 +5,7 @@ defmodule SApp.Chat do Chat rooms are globally identified by their channel name. A chat room manifest is of the form: - {:chat, channel_name} + %SApp.Chat.Manifest{channel: channel_name} Future improvements: - message signing @@ -28,7 +28,7 @@ defmodule SApp.Chat do defimpl Shard.Manifest, for: Manifest do def start(m) do - DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m.channel}) + DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m}) end end @@ -36,15 +36,15 @@ defmodule SApp.Chat do @doc """ Start a process that connects to a given channel """ - def start_link(channel) do - GenServer.start_link(__MODULE__, channel) + def start_link(manifest) do + GenServer.start_link(__MODULE__, manifest) end @doc """ Initialize channel process. """ - def init(channel) do - manifest = %Manifest{channel: channel} + def init(manifest) do + %Manifest{channel: channel} = manifest id = SData.term_hash manifest case Shard.Manager.register(id, manifest, self()) do @@ -62,10 +62,12 @@ defmodule SApp.Chat do mst = %MST{store: %SApp.PageStore{pid: page_store}, cmp: &msg_cmp/2, root: root} - GenServer.cast(self(), :init_pull) + group = %SNet.PubShardGroup{id: id} + SNet.Group.init_lookup(group, self()) {:ok, %{channel: channel, id: id, + group: group, manifest: manifest, page_store: page_store, mst: mst, @@ -96,18 +98,6 @@ defmodule SApp.Chat do end @doc """ - Implementation of the :init_pull handler, which is called when the - process starts. It contacts all currently connected peers and asks them to - send data for this channel if they have some. - """ - def handle_cast(:init_pull, state) do - for {_, pid, _} <- Shard.Manager.list_connections do - GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) - end - {:noreply, state} - end - - @doc """ Implementation of the :chat_send handler. This is the main handler that is used to send a message to the chat room. Puts the message in the store and syncs with all connected peers. @@ -129,9 +119,7 @@ defmodule SApp.Chat do end notif = {:append, prev_root, msgitem, mst.root} - for peer_info <- Shard.Manager.get_shard_peers(state.id) do - Shard.Manager.send(peer_info, {state.id, nil, notif}) - end + SNet.Group.broadcast(state.group, notif) {:noreply, state} end @@ -140,8 +128,10 @@ defmodule SApp.Chat do Implementation of the :interested handler, this is called when a peer we are connected to asks to recieve data for this channel. """ - def handle_cast({:interested, conn_pid, _auth}, state) do - Shard.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) + def handle_cast({:interested, conn_pid, auth}, state) do + if SNet.Group.in_group?(state.group, conn_pid, auth) do + Shard.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) + end {:noreply, state} end @@ -160,54 +150,59 @@ defmodule SApp.Chat do - `{:info, start, list, rest}`: put some messages and informs of the Merkle hash of the store of older messages. """ - def handle_cast({:msg, conn_pid, _auth, _shard_id, nil, msg}, state) do - state = case msg do - {:get_manifest} -> - Shard.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}}) - state - {:append, prev_root, msgitem, new_root} -> - # Append message: one single mesage has arrived - if new_root == state.mst.root do - # We already have the message, do nothing + def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do + if not SNet.Group.in_group?(state.group, conn_pid, auth) do + # Ignore message + {:noreply, state} + else + state = case msg do + {:get_manifest} -> + Shard.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}}) state - else - # Try adding the message - {pk, bin, sign} = msgitem - if Shard.Keys.verify(pk, bin, sign) == :ok do - if prev_root == state.mst.root do - # Only one new message, insert it directly - mst2 = MST.insert(state.mst, msgitem) - if mst2.root == new_root do - state = %{state | mst: mst2} - GenServer.cast(state.page_store, {:set_roots, [mst2.root]}) - Shard.Manager.save_state(state.id, mst2.root) - msg_callback(state, msgitem) - state + {:append, prev_root, msgitem, new_root} -> + # Append message: one single mesage has arrived + if new_root == state.mst.root do + # We already have the message, do nothing + state + else + # Try adding the message + {pk, bin, sign} = msgitem + if Shard.Keys.verify(pk, bin, sign) == :ok do + if prev_root == state.mst.root do + # Only one new message, insert it directly + mst2 = MST.insert(state.mst, msgitem) + if mst2.root == new_root do + state = %{state | mst: mst2} + GenServer.cast(state.page_store, {:set_roots, [mst2.root]}) + Shard.Manager.save_state(state.id, mst2.root) + msg_callback(state, msgitem) + state + else + Logger.warn("Invalid new root after inserting same message item!") + state + end else - Logger.warn("Invalid new root after inserting same message item!") - state + # Not a simple one-insertion transition, look at the whole tree + init_merge(state, new_root, conn_pid) end else - # Not a simple one-insertion transition, look at the whole tree - init_merge(state, new_root, conn_pid) + Logger.warn("Received message with invalid signature") + state end - else - Logger.warn("Received message with invalid signature") + end + {:root, new_root} -> + if new_root == state.mst.root do + # already up to date, ignore state + else + init_merge(state, new_root, conn_pid) end - end - {:root, new_root} -> - if new_root == state.mst.root do - # already up to date, ignore + x -> + Logger.info("Unhandled message: #{inspect x}") state - else - init_merge(state, new_root, conn_pid) - end - x -> - Logger.info("Unhandled message: #{inspect x}") - state + end + {:noreply, state} end - {:noreply, state} end defp init_merge(state, new_root, source_peer_pid) do @@ -260,7 +255,7 @@ defmodule SApp.Chat do end end - defp msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do + def msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do {ts1, msg1} = SData.term_unbin msgbin1 {ts2, msg2} = SData.term_unbin msgbin2 cond do |