defmodule SApp.Chat do
@moduledoc """
Shard application for a replicated chat room with full history.
Chat rooms are globally identified by their channel name.
A chat room manifest is of the form:
%SApp.Chat.Manifest{channel: channel_name}
A private chat room manifest is of the form:
%SApp.Chat.PrivChat.Manifest{pk_list: ordered_list_of_authorized_pks}
Future improvements:
- message signing
- storage of the chatroom messages to disk
- use a DHT to find peers that are interested in this channel
- epidemic broadcast (carefull not to be too costly,
maybe by limiting the number of peers we talk to)
- partial synchronization only == data distributed over peers
"""
use GenServer
require Logger
alias SData.MerkleSearchTree, as: MST
# =========
# MANIFESTS
# =========
defmodule Manifest do
@moduledoc"""
Manifest for a public chat room defined by its name.
"""
defstruct [:channel]
defimpl Shard.Manifest do
def module(_m), do: SApp.Chat
end
end
defmodule PrivChat.Manifest do
@moduledoc"""
Manifest for a private chat room defined by the list of participants.
Do not instanciate this struct directly, use `new` to ensure a canonical representation.
"""
defstruct [:pk_list]
@doc"""
Ensures a canonical representation by sorting pks and removing duplicates.
"""
def new(pk_list) do
%__MODULE__{pk_list: pk_list |> Enum.sort |> Enum.uniq}
end
defimpl Shard.Manifest do
def module(_m), do: SApp.Chat
end
end
# ==========
# MAIN LOGIC
# ==========
@doc """
Start a process that connects to a given channel
"""
def start_link(manifest) do
GenServer.start_link(__MODULE__, manifest)
end
@doc """
Initialize channel process.
"""
def init(manifest) do
id = SData.term_hash manifest
netgroup = case manifest do
%Manifest{channel: _channel} ->
%SNet.PubShardGroup{id: id}
%PrivChat.Manifest{pk_list: pk_list} ->
%SNet.PrivGroup{pk_list: pk_list}
end
Shard.Manager.dispatch_to(id, nil, self())
{:ok, page_store} = SApp.PageStore.start_link(id, :page_store, netgroup)
{root, read} = case Shard.Manager.load_state id do
%{root: root, read: read} -> {root, read}
_ -> {nil, nil}
end
root = cond do
root == nil -> nil
GenServer.call(page_store, {:have_rec, root}) -> root
true ->
Logger.warn "Not all pages for saved root were saved, restarting from an empty state!"
nil
end
mst = %MST{store: %SApp.PageStore{pid: page_store},
cmp: &msg_cmp/2,
root: root}
SNet.Group.init_lookup(netgroup, self())
{:ok,
%{id: id,
netgroup: netgroup,
manifest: manifest,
page_store: page_store,
mst: mst,
subs: MapSet.new,
read: read,
}
}
end
@doc """
Implementation of the :manifest call that returns the chat room's manifest
"""
def handle_call(:manifest, _from, state) do
{:reply, state.manifest, state}
end
def handle_call({:read_history, top_bound, num}, _from, state) do
ret = MST.last(state.mst, top_bound, num)
{:reply, ret, state}
end
def handle_call(:has_unread, _from, state) do
if state.mst.root != state.read do
case MST.last(state.mst, nil, 1) do
[{{_, msgbin, _}, true}] ->
{ts, _} = SData.term_unbin msgbin
{:reply, ts, state}
[] ->
{:reply, nil, state}
end
else
{:reply, nil, state}
end
end
def handle_cast(:mark_read, state) do
state = %{state | read: state.mst.root}
save_state(state)
{:noreply, state}
end
@doc """
Implementation of the :chat_send handler. This is the main handler that is used
to send a message to the chat room. Puts the message in the store and syncs
with all connected peers.
"""
def handle_cast({:chat_send, pk, msg}, state) do
next_ts = case MST.last(state.mst, nil, 1) do
[] -> System.os_time :seconds
[{{_, msgbin, _}, true}] ->
{ts, _msg} = SData.term_unbin msgbin
max(ts + 1, System.os_time :seconds)
end
msgbin = SData.term_bin {next_ts, msg}
{:ok, sign} = Shard.Keys.sign_detached(pk, msgbin)
msgitem = {pk, msgbin, sign}
prev_root = state.mst.root
mst = MST.insert(state.mst, msgitem)
state = %{state | mst: mst}
save_state(state)
for pid <- state.subs do
if Process.alive?(pid) do
send(pid, {:chat_send, state.manifest, msgitem})
end
end
notif = {state.id, nil, {:append, prev_root, msgitem, mst.root}}
SNet.Group.broadcast(state.netgroup, notif)
{:noreply, state}
end
def handle_cast({:peer_connected, conn_pid}, state) do
GenServer.cast(conn_pid, {:send_msg, {:interested, [state.id]}})
{:noreply, state}
end
@doc """
Implementation of the :interested handler, this is called when a peer we are
connected to asks to recieve data for this channel.
"""
def handle_cast({:interested, conn_pid, auth}, state) do
if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, true}})
end
{:noreply, state}
end
def handle_cast({:subscribe, pid}, state) do
Process.monitor(pid)
new_subs = MapSet.put(state.subs, pid)
{:noreply, %{ state | subs: new_subs }}
end
@doc """
Implementation of the :msg handler, which is the main handler for messages
comming from other peers concerning this chat room.
"""
def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
# Ignore message
{:noreply, state}
else
state = case msg do
{:get_manifest} ->
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}})
state
{:append, prev_root, msgitem, new_root} ->
# Append message: one single mesage has arrived
if new_root == state.mst.root do
# We already have the message, do nothing
state
else
# Try adding the message
{pk, bin, sign} = msgitem
if Shard.Keys.verify(pk, bin, sign) == :ok do
if prev_root == state.mst.root do
# Only one new message, insert it directly
mst2 = MST.insert(state.mst, msgitem)
if mst2.root == new_root do
state = %{state | mst: mst2}
GenServer.cast(state.page_store, {:set_roots, [mst2.root]})
save_state(state)
msg_callback(state, msgitem)
SNet.Group.broadcast(state.netgroup, {state.id, nil, msg}, exclude_pid: [conn_pid])
state
else
Logger.warn("Invalid new root after inserting same message item!")
state
end
else
# Not a simple one-insertion transition, look at the whole tree
init_merge(state, new_root, conn_pid)
end
else
Logger.warn("Received message with invalid signature")
state
end
end
{:root, new_root, ask_reply} ->
state = if new_root == state.mst.root do
# already up to date, ignore
state
else
init_merge(state, new_root, conn_pid)
end
if ask_reply do
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, false}})
end
state
x ->
Logger.info("Unhandled message: #{inspect x}")
state
end
{:noreply, state}
end
end
defp init_merge(state, new_root, source_peer_pid) do
old_root = state.mst.root
if new_root == nil do
state
else
# TODO: make the merge asynchronous
Logger.info("Starting merge for #{inspect state.manifest}, merging root: #{new_root|>Base.encode16}")
prev_last = for {x, true} <- MST.last(state.mst, nil, 100), into: MapSet.new, do: x
mgmst = %{state.mst | root: new_root}
mgmst = put_in(mgmst.store.prefer_ask, [source_peer_pid])
mst = MST.merge(state.mst, mgmst)
new = for {x, true} <- MST.last(mst, nil, 100),
not MapSet.member?(prev_last, x)
do x end
correct = for x <- new do
{pk, bin, sign} = x
Shard.Keys.verify(pk, bin, sign)
end
if Enum.all? correct do
for x <- new do
msg_callback(state, x)
end
GenServer.cast(state.page_store, {:set_roots, [mst.root]})
state = %{state | mst: mst}
save_state(state)
if state.mst.root != old_root do
SNet.Group.broadcast(state.netgroup, {state.id, nil, {:root, state.mst.root, false}}, exclude_pid: [source_peer_pid])
end
state
else
Logger.warn("Incorrect signatures somewhere while merging, dropping merged data")
state
end
end
end
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
new_subs = MapSet.delete(state.subs, pid)
{:noreply, %{ state | subs: new_subs }}
end
defp save_state(state) do
Shard.Manager.save_state(state.id, %{root: state.mst.root, read: state.read})
end
defp msg_callback(state, {pk, msgbin, sign}) do
for pid <- state.subs do
if Process.alive?(pid) do
send(pid, {:chat_recv, state.manifest, {pk, msgbin, sign}})
end
end
end
defp msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do
{ts1, msg1} = SData.term_unbin msgbin1
{ts2, msg2} = SData.term_unbin msgbin2
cond do
ts1 > ts2 -> :after
ts1 < ts2 -> :before
pk1 > pk2 -> :after
pk1 < pk2 -> :before
msg1 > msg2 -> :after
msg1 < msg2 -> :before
true -> :duplicate
end
end
# ================
# PUBLIC INTERFACE
# ================
@doc"""
Subscribe to notifications for this chat room.
The process calling this function will start recieving messages of the form:
{:chat_recv, manifest, {pk, msgbin, sign}}
or
{:chat_send, manifest, {pk, msgbin, sign}}
msgbin can be used in the following way:
{timestamp, message} = SData.term_unbin msgbin
"""
def subscribe(shard_pid) do
GenServer.cast(shard_pid, {:subscribe, self()})
end
@doc"""
Send a message to a chat room.
"""
def chat_send(shard_pid, pk, msg) do
GenServer.cast(shard_pid, {:chat_send, pk, msg})
end
@doc"""
Read the history of a chat room.
The second argument is the last message to read.
If nil, will read the n last messages.
If not nill, will read the n last messages until the specified bound.
"""
def read_history(shard_pid, bound, n) do
GenServer.call(shard_pid, {:read_history, bound, n})
end
@doc"""
Return a shard's manifest from its pid.
"""
def get_manifest(shard_pid) do
GenServer.call(shard_pid, :manifest)
end
@doc"""
Returns timestamp of last message if chat room has unread messages, nil otherwise.
"""
def has_unread?(shard_pid) do
GenServer.call(shard_pid, :has_unread)
end
@doc"""
Mark all messages as read
"""
def mark_read(shard_pid) do
GenServer.cast(shard_pid, :mark_read)
end
end