defmodule SApp.Chat do @moduledoc """ Shard application for a replicated chat room with full history. Chat rooms are globally identified by their channel name. A chat room manifest is of the form: %SApp.Chat.Manifest{channel: channel_name} A private chat room manifest is of the form: %SApp.Chat.PrivChat.Manifest{pk_list: ordered_list_of_authorized_pks} Future improvements: - message signing - storage of the chatroom messages to disk - use a DHT to find peers that are interested in this channel - epidemic broadcast (carefull not to be too costly, maybe by limiting the number of peers we talk to) - partial synchronization only == data distributed over peers """ use GenServer require Logger alias SData.MerkleSearchTree, as: MST # ========= # MANIFESTS # ========= defmodule Manifest do defstruct [:channel] end defimpl Shard.Manifest, for: Manifest do def start(m) do DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m}) end end defmodule PrivChat.Manifest do defstruct [:pk_list] def new(pk_list) do %__MODULE__{pk_list: Enum.sort(pk_list)} end end defimpl Shard.Manifest, for: PrivChat.Manifest do def start(m) do DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m}) end end # ========== # MAIN LOGIC # ========== @doc """ Start a process that connects to a given channel """ def start_link(manifest) do GenServer.start_link(__MODULE__, manifest) end @doc """ Initialize channel process. """ def init(manifest) do id = SData.term_hash manifest case Shard.Manager.register(id, manifest, self()) do :ok -> Shard.Manager.dispatch_to(id, nil, self()) {:ok, page_store} = SApp.PageStore.start_link(id, :page_store) root = Shard.Manager.load_state id root = cond do root == nil -> nil GenServer.call(page_store, {:have_rec, root}) -> root true -> Logger.warn "Not all pages for saved root were saved, restarting from an empty state!" nil end mst = %MST{store: %SApp.PageStore{pid: page_store}, cmp: &msg_cmp/2, root: root} netgroup = case manifest do %Manifest{channel: _channel} -> %SNet.PubShardGroup{id: id} %PrivChat.Manifest{pk_list: pk_list} -> %SNet.PrivGroup{pk_list: pk_list} end SNet.Group.init_lookup(netgroup, self()) {:ok, %{id: id, netgroup: netgroup, manifest: manifest, page_store: page_store, mst: mst, subs: MapSet.new, } } :redundant -> exit(:redundant) end end def find_proc(chan) do manifest = %Manifest{channel: chan} id = SData.term_hash manifest Shard.Manager.find_proc id end @doc """ Implementation of the :manifest call that returns the chat room's manifest """ def handle_call(:manifest, _from, state) do {:reply, state.manifest, state} end def handle_call({:read_history, top_bound, num}, _from, state) do ret = MST.last(state.mst, top_bound, num) {:reply, ret, state} end @doc """ Implementation of the :chat_send handler. This is the main handler that is used to send a message to the chat room. Puts the message in the store and syncs with all connected peers. """ def handle_cast({:chat_send, pk, msg}, state) do msgbin = SData.term_bin {(System.os_time :seconds), msg} {:ok, sign} = Shard.Keys.sign_detached(pk, msgbin) msgitem = {pk, msgbin, sign} prev_root = state.mst.root mst = MST.insert(state.mst, msgitem) state = %{state | mst: mst} Shard.Manager.save_state(state.id, mst.root) for pid <- state.subs do if Process.alive?(pid) do send(pid, {:chat_send, state.manifest, msgitem}) end end notif = {state.id, nil, {:append, prev_root, msgitem, mst.root}} SNet.Group.broadcast(state.netgroup, notif) {:noreply, state} end def handle_cast({:peer_connected, conn_pid}, state) do # this is called by the SNet.Group thing so it is already authenticated SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) {:noreply, state} end @doc """ Implementation of the :interested handler, this is called when a peer we are connected to asks to recieve data for this channel. """ def handle_cast({:interested, conn_pid, auth}, state) do if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) end {:noreply, state} end def handle_cast({:subscribe, pid}, state) do Process.monitor(pid) new_subs = MapSet.put(state.subs, pid) {:noreply, %{ state | subs: new_subs }} end @doc """ Implementation of the :msg handler, which is the main handler for messages comming from other peers concerning this chat room. Messages are: - `{:get, start}`: get some messages starting at a given Merkle hash - `{:info, start, list, rest}`: put some messages and informs of the Merkle hash of the store of older messages. """ def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do # Ignore message {:noreply, state} else state = case msg do {:get_manifest} -> SNet.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}}) state {:append, prev_root, msgitem, new_root} -> # Append message: one single mesage has arrived if new_root == state.mst.root do # We already have the message, do nothing state else # Try adding the message {pk, bin, sign} = msgitem if Shard.Keys.verify(pk, bin, sign) == :ok do if prev_root == state.mst.root do # Only one new message, insert it directly mst2 = MST.insert(state.mst, msgitem) if mst2.root == new_root do state = %{state | mst: mst2} GenServer.cast(state.page_store, {:set_roots, [mst2.root]}) Shard.Manager.save_state(state.id, mst2.root) msg_callback(state, msgitem) state else Logger.warn("Invalid new root after inserting same message item!") state end else # Not a simple one-insertion transition, look at the whole tree init_merge(state, new_root, conn_pid) end else Logger.warn("Received message with invalid signature") state end end {:root, new_root} -> if new_root == state.mst.root do # already up to date, ignore state else init_merge(state, new_root, conn_pid) end x -> Logger.info("Unhandled message: #{inspect x}") state end {:noreply, state} end end defp init_merge(state, new_root, source_peer_pid) do if new_root == nil do state else # TODO: make the merge asynchronous Logger.info("Starting merge for #{inspect state.manifest}, merging root: #{new_root|>Base.encode16}") prev_last = for {x, true} <- MST.last(state.mst, nil, 100), into: MapSet.new, do: x mgmst = %{state.mst | root: new_root} mgmst = put_in(mgmst.store.prefer_ask, [source_peer_pid]) mst = MST.merge(state.mst, mgmst) new = for {x, true} <- MST.last(mst, nil, 100), not MapSet.member?(prev_last, x) do x end correct = for x <- new do {pk, bin, sign} = x Shard.Keys.verify(pk, bin, sign) end if Enum.all? correct do for x <- new do msg_callback(state, x) end GenServer.cast(state.page_store, {:set_roots, [mst.root]}) Shard.Manager.save_state(state.id, mst.root) %{state | mst: mst} else Logger.warn("Incorrect signatures somewhere while merging, dropping merged data") state end end end def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do new_subs = MapSet.delete(state.subs, pid) {:noreply, %{ state | subs: new_subs }} end defp msg_callback(state, {pk, msgbin, sign}) do for pid <- state.subs do if Process.alive?(pid) do send(pid, {:chat_recv, state.manifest, {pk, msgbin, sign}}) end end end def msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do {ts1, msg1} = SData.term_unbin msgbin1 {ts2, msg2} = SData.term_unbin msgbin2 cond do ts1 > ts2 -> :after ts1 < ts2 -> :before pk1 > pk2 -> :after pk1 < pk2 -> :before msg1 > msg2 -> :after msg1 < msg2 -> :before true -> :duplicate end end end