defmodule SApp.Chat do @moduledoc """ Shard application for a replicated chat room with full history. Chat rooms are globally identified by their channel name. A chat room manifest is of the form: %SApp.Chat.Manifest{channel: channel_name} A private chat room manifest is of the form: %SApp.Chat.PrivChat.Manifest{pk_list: ordered_list_of_authorized_pks} Future improvements: - use a DHT to find peers that are interested in this channel - partial synchronization only == data distributed over peers """ use GenServer require Logger alias SData.MerkleSearchTree, as: MST # ========= # MANIFESTS # ========= defmodule Manifest do @moduledoc""" Manifest for a public chat room defined by its name. Example: %SApp.Chat.Manifest{channel: "test"} """ defstruct [:channel] defimpl Shard.Manifest do def module(_m), do: SApp.Chat def is_valid?(_m), do: true end end defmodule PrivChat.Manifest do @moduledoc""" Manifest for a private chat room defined by the list of participants. Do not instanciate this struct directly, use `new` to ensure a canonical representation. """ defstruct [:pk_list] @doc""" Ensures a canonical representation by sorting pks and removing duplicates. """ def new(pk_list) do %__MODULE__{pk_list: pk_list |> Enum.sort |> Enum.uniq} end defimpl Shard.Manifest do def module(_m), do: SApp.Chat def is_valid?(m) do Enum.all?(m.pk_list, &(byte_size(&1)==32)) and m.pk_list == m.pk_list |> Enum.sort |> Enum.uniq end end end # ========== # MAIN LOGIC # ========== defmodule State do @moduledoc""" Internal state struct of chat shard. """ defstruct [:id, :netgroup, :manifest, :page_store, :mst, :subs, :read] end @doc """ Start a process that connects to a given channel. Don't call directly, use for instance: Shard.Manager.find_or_start %SApp.Chat.Manifest{channel: "my_chan"} """ def start_link(manifest) do GenServer.start_link(__MODULE__, manifest) end def init(manifest) do id = SData.term_hash manifest netgroup = case manifest do %Manifest{channel: _channel} -> %SNet.PubShardGroup{id: id} %PrivChat.Manifest{pk_list: pk_list} -> %SNet.PrivGroup{pk_list: pk_list} end Shard.Manager.dispatch_to(id, nil, self()) {:ok, page_store} = SApp.PageStore.start_link(id, :page_store, netgroup) {root, read} = case Shard.Manager.load_state id do %{root: root, read: read} -> {root, read} _ -> {nil, nil} end root = cond do root == nil -> nil SApp.PageStore.have_rec?(page_store, root) -> root true -> Logger.warn "Not all pages for saved root were saved, restarting from an empty state!" nil end mst = %MST{store: %SApp.PageStore{pid: page_store}, cmp: &msg_cmp/2, root: root} SNet.Group.init_lookup(netgroup, self()) {:ok, %State{id: id, netgroup: netgroup, manifest: manifest, page_store: page_store, mst: mst, subs: MapSet.new, read: read, } } end def handle_call(:manifest, _from, state) do {:reply, state.manifest, state} end def handle_call(:delete_shard, _from, state) do SApp.PageStore.delete_store(state.store) {:stop, :normal, :ok, state} end def handle_call({:read_history, top_bound, num}, _from, state) do ret = MST.last(state.mst, top_bound, num) {:reply, ret, state} end def handle_call(:has_unread, _from, state) do if state.mst.root != state.read do case MST.last(state.mst, nil, 1) do [{{_, msgbin, _}, true}] -> {ts, _} = SData.term_unbin msgbin {:reply, ts, state} [] -> {:reply, nil, state} end else {:reply, nil, state} end end def handle_cast(:send_deps, state) do GenServer.cast(Shard.Manager, {:dep_list, state.id, []}) {:noreply, state} end def handle_cast(:mark_read, state) do state = %{state | read: state.mst.root} save_state(state) {:noreply, state} end def handle_cast({:chat_send, pk, msg}, state) do next_ts = case MST.last(state.mst, nil, 1) do [] -> System.os_time :seconds [{{_, msgbin, _}, true}] -> {ts, _msg} = SData.term_unbin msgbin max(ts + 1, System.os_time :seconds) end msgbin = SData.term_bin {next_ts, msg} {:ok, sign} = Shard.Keys.sign_detached(pk, msgbin) msgitem = {pk, msgbin, sign} prev_root = state.mst.root mst = MST.insert(state.mst, msgitem) state = %{state | mst: mst} save_state(state) for pid <- state.subs do if Process.alive?(pid) do send(pid, {:chat_send, state.manifest, msgitem}) end end notif = {state.id, nil, {:append, prev_root, msgitem, mst.root}} SNet.Group.broadcast(state.netgroup, notif) {:noreply, state} end def handle_cast({:peer_connected, conn_pid}, state) do GenServer.cast(conn_pid, {:send_msg, {:interested, [state.id]}}) {:noreply, state} end def handle_cast({:interested, conn_pid, auth}, state) do if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, true}}) end {:noreply, state} end def handle_cast({:subscribe, pid}, state) do Process.monitor(pid) new_subs = MapSet.put(state.subs, pid) {:noreply, %{ state | subs: new_subs }} end def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do # Ignore message {:noreply, state} else state = case msg do {:append, prev_root, msgitem, new_root} -> # Append message: one single mesage has arrived if new_root == state.mst.root do # We already have the message, do nothing state else # Try adding the message {pk, bin, sign} = msgitem if Shard.Keys.verify(pk, bin, sign) == :ok do if prev_root == state.mst.root do # Only one new message, insert it directly mst2 = MST.insert(state.mst, msgitem) if mst2.root == new_root do state = %{state | mst: mst2} SApp.PageStore.set_roots(state.page_store, [mst2.root]) save_state(state) msg_callback(state, msgitem) SNet.Group.broadcast(state.netgroup, {state.id, nil, msg}, exclude_pid: [conn_pid]) state else Logger.warn("Invalid new root after inserting same message item!") state end else # Not a simple one-insertion transition, look at the whole tree init_merge(state, new_root, conn_pid) end else Logger.warn("Received message with invalid signature") state end end {:root, new_root, ask_reply} -> state = if new_root == state.mst.root do # already up to date, ignore state else init_merge(state, new_root, conn_pid) end if ask_reply do SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, false}}) end state x -> Logger.info("Unhandled message: #{inspect x}") state end {:noreply, state} end end defp init_merge(state, new_root, source_peer_pid) do old_root = state.mst.root if new_root == nil do state else # TODO: make the merge asynchronous Logger.info("Starting merge for #{inspect state.manifest}, merging root: #{new_root|>Base.encode16}") prev_last = for {x, true} <- MST.last(state.mst, nil, 100), into: MapSet.new, do: x mgmst = %{state.mst | root: new_root} mgmst = put_in(mgmst.store.prefer_ask, [source_peer_pid]) mst = MST.merge(state.mst, mgmst) new = for {x, true} <- MST.last(mst, nil, 100), not MapSet.member?(prev_last, x) do x end correct = for x <- new do {pk, bin, sign} = x Shard.Keys.verify(pk, bin, sign) end if Enum.all? correct do for x <- new do msg_callback(state, x) end SApp.PageStore.set_roots(state.page_store, [mst.root]) state = %{state | mst: mst} save_state(state) if state.mst.root != old_root do SNet.Group.broadcast(state.netgroup, {state.id, nil, {:root, state.mst.root, false}}, exclude_pid: [source_peer_pid]) end state else Logger.warn("Incorrect signatures somewhere while merging, dropping merged data") state end end end def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do new_subs = MapSet.delete(state.subs, pid) {:noreply, %{ state | subs: new_subs }} end defp save_state(state) do Shard.Manager.save_state(state.id, %{root: state.mst.root, read: state.read}) end defp msg_callback(state, {pk, msgbin, sign}) do for pid <- state.subs do if Process.alive?(pid) do send(pid, {:chat_recv, state.manifest, {pk, msgbin, sign}}) end end end defp msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do {ts1, msg1} = SData.term_unbin msgbin1 {ts2, msg2} = SData.term_unbin msgbin2 cond do ts1 > ts2 -> :after ts1 < ts2 -> :before pk1 > pk2 -> :after pk1 < pk2 -> :before msg1 > msg2 -> :after msg1 < msg2 -> :before true -> :duplicate end end # ================ # PUBLIC INTERFACE # ================ @doc""" Subscribe to notifications for this chat room. The process calling this function will start recieving messages of the form: {:chat_recv, manifest, {pk, msgbin, sign}} or {:chat_send, manifest, {pk, msgbin, sign}} msgbin can be used in the following way: {timestamp, message} = SData.term_unbin msgbin """ def subscribe(shard_pid) do GenServer.cast(shard_pid, {:subscribe, self()}) end @doc""" Send a message to a chat room. """ def chat_send(shard_pid, pk, msg) do GenServer.cast(shard_pid, {:chat_send, pk, msg}) end @doc""" Read the history of a chat room. The second argument is the last message to read. If nil, will read the n last messages. If not nill, will read the n last messages until the specified bound. """ def read_history(shard_pid, bound, n) do GenServer.call(shard_pid, {:read_history, bound, n}) end @doc""" Return a shard's manifest from its pid. """ def get_manifest(shard_pid) do GenServer.call(shard_pid, :manifest) end @doc""" Returns timestamp of last message if chat room has unread messages, nil otherwise. """ def has_unread?(shard_pid) do GenServer.call(shard_pid, :has_unread) end @doc""" Mark all messages as read """ def mark_read(shard_pid) do GenServer.cast(shard_pid, :mark_read) end end