diff options
author | Alex Auvolat <alex@adnab.me> | 2018-09-01 16:06:23 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-09-01 16:06:23 +0200 |
commit | c6ec33d6e612168e14d77007915a4ea423c55a2e (patch) | |
tree | 8b5645651a0cc991b8ac9c68c388d84c8dbe73d2 /shard/lib/app/chat.ex | |
parent | 1a0ef154a421af60f6d57dfe861dacb844a7d142 (diff) | |
download | shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.tar.gz shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.zip |
Move everything to subdirectory
Diffstat (limited to 'shard/lib/app/chat.ex')
-rw-r--r-- | shard/lib/app/chat.ex | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex new file mode 100644 index 0000000..051fab6 --- /dev/null +++ b/shard/lib/app/chat.ex @@ -0,0 +1,208 @@ +defmodule SApp.Chat do + @moduledoc """ + Shard application for a replicated chat room with full history. + + Chat rooms are globally identified by their channel name. + A chat room manifest is of the form: + + {:chat, channel_name} + + Future improvements: + - message signing + - storage of the chatroom messages to disk + - storage of the known peers that have this channel to disk + - use a DHT to find peers that are interested in this channel + - epidemic broadcast (carefull not to be too costly, + maybe by limiting the number of peers we talk to) + - partial synchronization only == data distributed over peers + """ + + use GenServer + + require Logger + alias SData.MerkleSearchTree, as: MST + + @doc """ + Start a process that connects to a given channel + """ + def start_link(channel) do + GenServer.start_link(__MODULE__, channel) + end + + @doc """ + Initialize channel process. + """ + def init(channel) do + manifest = {:chat, channel} + id = SData.term_hash manifest + + case Shard.Manager.register(id, manifest, self()) do + :ok -> + Shard.Manager.dispatch_to(id, nil, self()) + {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store) + mst = %MST{store: %SApp.BlockStore{pid: block_store}, + cmp: &msg_cmp/2} + GenServer.cast(self(), :init_pull) + {:ok, + %{channel: channel, + id: id, + manifest: manifest, + block_store: block_store, + mst: mst, + subs: MapSet.new, + } + } + :redundant -> + exit(:redundant) + end + end + + @doc """ + Implementation of the :manifest call that returns the chat room's manifest + """ + def handle_call(:manifest, _from, state) do + {:reply, state.manifest, state} + end + + def handle_call({:read_history, top_bound, num}, _from, state) do + ret = MST.last(state.mst, top_bound, num) + {:reply, ret, state} + end + + @doc """ + Implementation of the :init_pull handler, which is called when the + process starts. It contacts all currently connected peers and asks them to + send data for this channel if they have some. + """ + def handle_cast(:init_pull, state) do + for {_, pid, _, _} <- :ets.tab2list(:peer_db) do + GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) + end + {:noreply, state} + end + + @doc """ + Implementation of the :chat_send handler. This is the main handler that is used + to send a message to the chat room. Puts the message in the store and syncs + with all connected peers. + """ + def handle_cast({:chat_send, msg}, state) do + msgitem = {(System.os_time :seconds), + Shard.Identity.get_nickname(), + msg} + prev_root = state.mst.root + mst = MST.insert(state.mst, msgitem) + state = %{state | mst: mst} + + for pid <- state.subs do + if Process.alive?(pid) do + send(pid, {:chat_send, state.channel, msgitem}) + end + end + + notif = {:append, prev_root, msgitem, mst.root} + for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do + Shard.Manager.send(peer_id, {state.id, nil, notif}) + end + + {:noreply, state} + end + + @doc """ + Implementation of the :interested handler, this is called when a peer we are + connected to asks to recieve data for this channel. + """ + def handle_cast({:interested, peer_id}, state) do + Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}}) + {:noreply, state} + end + + def handle_cast({:subscribe, pid}, state) do + Process.monitor(pid) + new_subs = MapSet.put(state.subs, pid) + {:noreply, %{ state | subs: new_subs }} + end + + @doc """ + Implementation of the :msg handler, which is the main handler for messages + comming from other peers concerning this chat room. + + Messages are: + - `{:get, start}`: get some messages starting at a given Merkle hash + - `{:info, start, list, rest}`: put some messages and informs of the + Merkle hash of the store of older messages. + """ + def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do + state = case msg do + {:get_manifest} -> + Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}}) + state + {:append, prev_root, msgitem, new_root} -> + # Append message: one single mesage has arrived + if new_root == state.mst.root do + # We already have the message, do nothing + state + else + # Try adding the message + if prev_root == state.mst.root do + mst2 = MST.insert(state.mst, msgitem) + if mst2.root == new_root do + # This was the only message missing, we are happy! + state = %{state | mst: mst2} + msg_callback(state, msgitem) + state + else + # More messages are missing, start a full merge + init_merge(state, new_root, peer_id) + end + else + init_merge(state, new_root, peer_id) + end + end + {:root, new_root} -> + if new_root == state.mst.root do + # already up to date, ignore + state + else + init_merge(state, new_root, peer_id) + end + x -> + Logger.info("Unhandled message: #{inspect x}") + state + end + {:noreply, state} + end + + defp init_merge(state, new_root, source_peer) do + # TODO: make the merge asynchronous + mgmst = %{state.mst | root: new_root} + mgmst = put_in(mgmst.store.prefer_ask, [source_peer]) + mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end) + %{state | mst: mst} + end + + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + new_subs = MapSet.delete(state.subs, pid) + {:noreply, %{ state | subs: new_subs }} + end + + defp msg_callback(state, {ts, nick, msg}) do + for pid <- state.subs do + if Process.alive?(pid) do + send(pid, {:chat_recv, state.channel, {ts, nick, msg}}) + end + end + end + + defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do + cond do + ts1 > ts2 -> :after + ts1 < ts2 -> :before + nick1 > nick2 -> :after + nick1 < nick2 -> :before + msg1 > msg2 -> :after + msg1 < msg2 -> :before + true -> :duplicate + end + end +end |