diff options
author | Alex Auvolat <alex@adnab.me> | 2018-09-01 16:06:23 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-09-01 16:06:23 +0200 |
commit | c6ec33d6e612168e14d77007915a4ea423c55a2e (patch) | |
tree | 8b5645651a0cc991b8ac9c68c388d84c8dbe73d2 /lib/app/chat.ex | |
parent | 1a0ef154a421af60f6d57dfe861dacb844a7d142 (diff) | |
download | shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.tar.gz shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.zip |
Move everything to subdirectory
Diffstat (limited to 'lib/app/chat.ex')
-rw-r--r-- | lib/app/chat.ex | 208 |
1 files changed, 0 insertions, 208 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex deleted file mode 100644 index 051fab6..0000000 --- a/lib/app/chat.ex +++ /dev/null @@ -1,208 +0,0 @@ -defmodule SApp.Chat do - @moduledoc """ - Shard application for a replicated chat room with full history. - - Chat rooms are globally identified by their channel name. - A chat room manifest is of the form: - - {:chat, channel_name} - - Future improvements: - - message signing - - storage of the chatroom messages to disk - - storage of the known peers that have this channel to disk - - use a DHT to find peers that are interested in this channel - - epidemic broadcast (carefull not to be too costly, - maybe by limiting the number of peers we talk to) - - partial synchronization only == data distributed over peers - """ - - use GenServer - - require Logger - alias SData.MerkleSearchTree, as: MST - - @doc """ - Start a process that connects to a given channel - """ - def start_link(channel) do - GenServer.start_link(__MODULE__, channel) - end - - @doc """ - Initialize channel process. - """ - def init(channel) do - manifest = {:chat, channel} - id = SData.term_hash manifest - - case Shard.Manager.register(id, manifest, self()) do - :ok -> - Shard.Manager.dispatch_to(id, nil, self()) - {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store) - mst = %MST{store: %SApp.BlockStore{pid: block_store}, - cmp: &msg_cmp/2} - GenServer.cast(self(), :init_pull) - {:ok, - %{channel: channel, - id: id, - manifest: manifest, - block_store: block_store, - mst: mst, - subs: MapSet.new, - } - } - :redundant -> - exit(:redundant) - end - end - - @doc """ - Implementation of the :manifest call that returns the chat room's manifest - """ - def handle_call(:manifest, _from, state) do - {:reply, state.manifest, state} - end - - def handle_call({:read_history, top_bound, num}, _from, state) do - ret = MST.last(state.mst, top_bound, num) - {:reply, ret, state} - end - - @doc """ - Implementation of the :init_pull handler, which is called when the - process starts. It contacts all currently connected peers and asks them to - send data for this channel if they have some. - """ - def handle_cast(:init_pull, state) do - for {_, pid, _, _} <- :ets.tab2list(:peer_db) do - GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) - end - {:noreply, state} - end - - @doc """ - Implementation of the :chat_send handler. This is the main handler that is used - to send a message to the chat room. Puts the message in the store and syncs - with all connected peers. - """ - def handle_cast({:chat_send, msg}, state) do - msgitem = {(System.os_time :seconds), - Shard.Identity.get_nickname(), - msg} - prev_root = state.mst.root - mst = MST.insert(state.mst, msgitem) - state = %{state | mst: mst} - - for pid <- state.subs do - if Process.alive?(pid) do - send(pid, {:chat_send, state.channel, msgitem}) - end - end - - notif = {:append, prev_root, msgitem, mst.root} - for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do - Shard.Manager.send(peer_id, {state.id, nil, notif}) - end - - {:noreply, state} - end - - @doc """ - Implementation of the :interested handler, this is called when a peer we are - connected to asks to recieve data for this channel. - """ - def handle_cast({:interested, peer_id}, state) do - Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}}) - {:noreply, state} - end - - def handle_cast({:subscribe, pid}, state) do - Process.monitor(pid) - new_subs = MapSet.put(state.subs, pid) - {:noreply, %{ state | subs: new_subs }} - end - - @doc """ - Implementation of the :msg handler, which is the main handler for messages - comming from other peers concerning this chat room. - - Messages are: - - `{:get, start}`: get some messages starting at a given Merkle hash - - `{:info, start, list, rest}`: put some messages and informs of the - Merkle hash of the store of older messages. - """ - def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do - state = case msg do - {:get_manifest} -> - Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}}) - state - {:append, prev_root, msgitem, new_root} -> - # Append message: one single mesage has arrived - if new_root == state.mst.root do - # We already have the message, do nothing - state - else - # Try adding the message - if prev_root == state.mst.root do - mst2 = MST.insert(state.mst, msgitem) - if mst2.root == new_root do - # This was the only message missing, we are happy! - state = %{state | mst: mst2} - msg_callback(state, msgitem) - state - else - # More messages are missing, start a full merge - init_merge(state, new_root, peer_id) - end - else - init_merge(state, new_root, peer_id) - end - end - {:root, new_root} -> - if new_root == state.mst.root do - # already up to date, ignore - state - else - init_merge(state, new_root, peer_id) - end - x -> - Logger.info("Unhandled message: #{inspect x}") - state - end - {:noreply, state} - end - - defp init_merge(state, new_root, source_peer) do - # TODO: make the merge asynchronous - mgmst = %{state.mst | root: new_root} - mgmst = put_in(mgmst.store.prefer_ask, [source_peer]) - mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end) - %{state | mst: mst} - end - - def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do - new_subs = MapSet.delete(state.subs, pid) - {:noreply, %{ state | subs: new_subs }} - end - - defp msg_callback(state, {ts, nick, msg}) do - for pid <- state.subs do - if Process.alive?(pid) do - send(pid, {:chat_recv, state.channel, {ts, nick, msg}}) - end - end - end - - defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do - cond do - ts1 > ts2 -> :after - ts1 < ts2 -> :before - nick1 > nick2 -> :after - nick1 < nick2 -> :before - msg1 > msg2 -> :after - msg1 < msg2 -> :before - true -> :duplicate - end - end -end |