aboutsummaryrefslogtreecommitdiff
path: root/lib/app/chat.ex
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-09-01 16:06:23 +0200
committerAlex Auvolat <alex@adnab.me>2018-09-01 16:06:23 +0200
commitc6ec33d6e612168e14d77007915a4ea423c55a2e (patch)
tree8b5645651a0cc991b8ac9c68c388d84c8dbe73d2 /lib/app/chat.ex
parent1a0ef154a421af60f6d57dfe861dacb844a7d142 (diff)
downloadshard-c6ec33d6e612168e14d77007915a4ea423c55a2e.tar.gz
shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.zip
Move everything to subdirectory
Diffstat (limited to 'lib/app/chat.ex')
-rw-r--r--lib/app/chat.ex208
1 files changed, 0 insertions, 208 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
deleted file mode 100644
index 051fab6..0000000
--- a/lib/app/chat.ex
+++ /dev/null
@@ -1,208 +0,0 @@
-defmodule SApp.Chat do
- @moduledoc """
- Shard application for a replicated chat room with full history.
-
- Chat rooms are globally identified by their channel name.
- A chat room manifest is of the form:
-
- {:chat, channel_name}
-
- Future improvements:
- - message signing
- - storage of the chatroom messages to disk
- - storage of the known peers that have this channel to disk
- - use a DHT to find peers that are interested in this channel
- - epidemic broadcast (carefull not to be too costly,
- maybe by limiting the number of peers we talk to)
- - partial synchronization only == data distributed over peers
- """
-
- use GenServer
-
- require Logger
- alias SData.MerkleSearchTree, as: MST
-
- @doc """
- Start a process that connects to a given channel
- """
- def start_link(channel) do
- GenServer.start_link(__MODULE__, channel)
- end
-
- @doc """
- Initialize channel process.
- """
- def init(channel) do
- manifest = {:chat, channel}
- id = SData.term_hash manifest
-
- case Shard.Manager.register(id, manifest, self()) do
- :ok ->
- Shard.Manager.dispatch_to(id, nil, self())
- {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store)
- mst = %MST{store: %SApp.BlockStore{pid: block_store},
- cmp: &msg_cmp/2}
- GenServer.cast(self(), :init_pull)
- {:ok,
- %{channel: channel,
- id: id,
- manifest: manifest,
- block_store: block_store,
- mst: mst,
- subs: MapSet.new,
- }
- }
- :redundant ->
- exit(:redundant)
- end
- end
-
- @doc """
- Implementation of the :manifest call that returns the chat room's manifest
- """
- def handle_call(:manifest, _from, state) do
- {:reply, state.manifest, state}
- end
-
- def handle_call({:read_history, top_bound, num}, _from, state) do
- ret = MST.last(state.mst, top_bound, num)
- {:reply, ret, state}
- end
-
- @doc """
- Implementation of the :init_pull handler, which is called when the
- process starts. It contacts all currently connected peers and asks them to
- send data for this channel if they have some.
- """
- def handle_cast(:init_pull, state) do
- for {_, pid, _, _} <- :ets.tab2list(:peer_db) do
- GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
- end
- {:noreply, state}
- end
-
- @doc """
- Implementation of the :chat_send handler. This is the main handler that is used
- to send a message to the chat room. Puts the message in the store and syncs
- with all connected peers.
- """
- def handle_cast({:chat_send, msg}, state) do
- msgitem = {(System.os_time :seconds),
- Shard.Identity.get_nickname(),
- msg}
- prev_root = state.mst.root
- mst = MST.insert(state.mst, msgitem)
- state = %{state | mst: mst}
-
- for pid <- state.subs do
- if Process.alive?(pid) do
- send(pid, {:chat_send, state.channel, msgitem})
- end
- end
-
- notif = {:append, prev_root, msgitem, mst.root}
- for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do
- Shard.Manager.send(peer_id, {state.id, nil, notif})
- end
-
- {:noreply, state}
- end
-
- @doc """
- Implementation of the :interested handler, this is called when a peer we are
- connected to asks to recieve data for this channel.
- """
- def handle_cast({:interested, peer_id}, state) do
- Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}})
- {:noreply, state}
- end
-
- def handle_cast({:subscribe, pid}, state) do
- Process.monitor(pid)
- new_subs = MapSet.put(state.subs, pid)
- {:noreply, %{ state | subs: new_subs }}
- end
-
- @doc """
- Implementation of the :msg handler, which is the main handler for messages
- comming from other peers concerning this chat room.
-
- Messages are:
- - `{:get, start}`: get some messages starting at a given Merkle hash
- - `{:info, start, list, rest}`: put some messages and informs of the
- Merkle hash of the store of older messages.
- """
- def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do
- state = case msg do
- {:get_manifest} ->
- Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}})
- state
- {:append, prev_root, msgitem, new_root} ->
- # Append message: one single mesage has arrived
- if new_root == state.mst.root do
- # We already have the message, do nothing
- state
- else
- # Try adding the message
- if prev_root == state.mst.root do
- mst2 = MST.insert(state.mst, msgitem)
- if mst2.root == new_root do
- # This was the only message missing, we are happy!
- state = %{state | mst: mst2}
- msg_callback(state, msgitem)
- state
- else
- # More messages are missing, start a full merge
- init_merge(state, new_root, peer_id)
- end
- else
- init_merge(state, new_root, peer_id)
- end
- end
- {:root, new_root} ->
- if new_root == state.mst.root do
- # already up to date, ignore
- state
- else
- init_merge(state, new_root, peer_id)
- end
- x ->
- Logger.info("Unhandled message: #{inspect x}")
- state
- end
- {:noreply, state}
- end
-
- defp init_merge(state, new_root, source_peer) do
- # TODO: make the merge asynchronous
- mgmst = %{state.mst | root: new_root}
- mgmst = put_in(mgmst.store.prefer_ask, [source_peer])
- mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end)
- %{state | mst: mst}
- end
-
- def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
- new_subs = MapSet.delete(state.subs, pid)
- {:noreply, %{ state | subs: new_subs }}
- end
-
- defp msg_callback(state, {ts, nick, msg}) do
- for pid <- state.subs do
- if Process.alive?(pid) do
- send(pid, {:chat_recv, state.channel, {ts, nick, msg}})
- end
- end
- end
-
- defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
- cond do
- ts1 > ts2 -> :after
- ts1 < ts2 -> :before
- nick1 > nick2 -> :after
- nick1 < nick2 -> :before
- msg1 > msg2 -> :after
- msg1 < msg2 -> :before
- true -> :duplicate
- end
- end
-end