diff options
-rw-r--r-- | lib/app/chat.ex | 62 | ||||
-rw-r--r-- | lib/application.ex | 5 | ||||
-rw-r--r-- | lib/cli/cli.ex | 14 | ||||
-rw-r--r-- | lib/data/data.ex | 14 | ||||
-rw-r--r-- | lib/data/merklelist.ex | 29 |
5 files changed, 102 insertions, 22 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex index f165514..e86f739 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -1,14 +1,37 @@ defmodule SApp.Chat do + @moduledoc """ + Shard application for a replicated chat room with full history. + + Chat rooms are globally identified by their channel name. + A chat room manifest is of the form: + + {:chat, channel_name} + + Future improvements: + - message signing + - storage of the chatroom messages to disk + - storage of the known peers that have this channel to disk + - use a DHT to find peers that are interested in this channel + - epidemic broadcast (carefull not to be too costly, + maybe by limiting the number of peers we talk to) + """ + use GenServer + @doc """ + Start a process that connects to a given channel + """ def start_link(channel) do GenServer.start_link(__MODULE__, channel) end + @doc """ + Initialize channel process. + """ def init(channel) do {:ok, store} = SData.MerkleList.start_link(&msg_cmp/2) manifest = {:chat, channel} - id = :crypto.hash(:sha256, :erlang.term_to_binary(manifest)) + id = SData.term_hash manifest GenServer.cast(Shard.Manager, {:register, id, self()}) GenServer.cast(self(), :init_pull) @@ -16,20 +39,37 @@ defmodule SApp.Chat do {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: %{}}} end + @doc """ + Implementation of the :manifest call that returns the chat room's manifest + """ def handle_call(:manifest, _from, state) do {:reply, state.manifest, state} end + @doc """ + Implementation of the :redundant handler: if another process is already + synchronizing this channel then we exit. + """ def handle_cast({:redundant, _}, _state) do exit :normal end + @doc """ + Implementation of the :init_pull handler, which is called when the + process starts. It contacts all currently connected peers and asks them to + send data for this channel if they have some. + """ def handle_cast(:init_pull, state) do GenServer.call(SNet.Manager, :get_all) |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}}))) {:noreply, state} end + @doc """ + Implementation of the :chat_send handler. This is the main handler that is used + to send a message to the chat room. Puts the message in the store and syncs + with all connected peers. + """ def handle_cast({:chat_send, msg}, state) do msgitem = {(System.os_time :seconds), Shard.Identity.get_nickname(), @@ -43,15 +83,27 @@ defmodule SApp.Chat do {:noreply, state} end + @doc """ + Implementation of the :interested handler, this is called when a peer we are + connected to asks to recieve data for this channel. + """ def handle_cast({:interested, peer_id, peer_pid}, state) do push_messages(state, peer_pid, nil, 10) new_peers = Map.put(state.peers, peer_id, peer_pid) {:noreply, %{ state | peers: new_peers }} end + @doc """ + Implementation of the :msg handler, which is the main handler for messages + comming from other peers concerning this chat room. + + Messages are: + - `{:get, start}`: get some messages starting at a given Merkle hash + - `{:info, start, list, rest}`: put some messages and informs of the + Merkle hash of the store of older messages. + """ def handle_cast({:msg, peer_id, peer_pid, msg}, state) do case msg do - :get_top -> push_messages(peer_id, state, nil, 10) {:get, start} -> push_messages(peer_id, state, start, 20) {:info, _start, list, rest} -> if rest != nil and not GenServer.call(state.store, {:has, rest}) do @@ -70,6 +122,7 @@ defmodule SApp.Chat do end end + defp push_messages(state, to, start, num) do case GenServer.call(state.store, {:read, start, num}) do {:ok, list, rest} -> @@ -78,12 +131,11 @@ defmodule SApp.Chat do end end - - def msg_callback(chan, {ts, nick, msg}) do + defp msg_callback(chan, {ts, nick, msg}) do IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}" end - def msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do + defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do SData.MerkleList.cmp_ts_str({ts1, nick1<>"|"<>msg1}, {ts2, nick2<>"|"<>msg2}) end diff --git a/lib/application.ex b/lib/application.ex index e375cf4..3ba6e12 100644 --- a/lib/application.ex +++ b/lib/application.ex @@ -1,6 +1,9 @@ defmodule Shard.Application do @moduledoc """ - Documentation for Shard. + Main Shard application. + + Shard is a prototype peer-to-peer comunication platform with data + synchronization. """ use Application diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex index 0402b3f..d2e5f6a 100644 --- a/lib/cli/cli.ex +++ b/lib/cli/cli.ex @@ -1,4 +1,8 @@ defmodule SCLI do + @moduledoc """ + Small command line interface for the chat application + """ + def run() do run(nil) end @@ -26,14 +30,14 @@ defmodule SCLI do end end - def handle_command(pid, ["connect", ipstr, portstr]) do + defp handle_command(pid, ["connect", ipstr, portstr]) do {:ok, ip} = :inet.parse_address (to_charlist ipstr) {port, _} = Integer.parse portstr SNet.Manager.add_peer(ip, port) pid end - def handle_command(pid, ["list"]) do + defp handle_command(pid, ["list"]) do IO.puts "List of known channels:" list = GenServer.call(Shard.Manager, :list) for {_chid, chpid} <- list do @@ -43,7 +47,7 @@ defmodule SCLI do pid end - def handle_command(pid, ["join", qchan]) do + defp handle_command(pid, ["join", qchan]) do list = GenServer.call(Shard.Manager, :list) list = for {_chid, chpid} <- list, {:chat, chan} = GenServer.call(chpid, :manifest), @@ -58,12 +62,12 @@ defmodule SCLI do end end - def handle_command(pid, ["nick", nick]) do + defp handle_command(pid, ["nick", nick]) do Shard.Identity.set_nickname nick pid end - def handle_command(pid, _cmd) do + defp handle_command(pid, _cmd) do IO.puts "Invalid command" pid end diff --git a/lib/data/data.ex b/lib/data/data.ex new file mode 100644 index 0000000..2c6e629 --- /dev/null +++ b/lib/data/data.ex @@ -0,0 +1,14 @@ +defmodule SData do + @moduledoc """ + Utility functions + """ + + @doc """ + Calculate the hash of an Erlang term by first converting it to its + binary representation. + """ + def term_hash(term) do + :crypto.hash(:sha256, (:erlang.term_to_binary term)) + end + +end diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex index 42b80fc..727f2a8 100644 --- a/lib/data/merklelist.ex +++ b/lib/data/merklelist.ex @@ -1,26 +1,33 @@ defmodule SData.MerkleList do + @moduledoc """ + A simple Merkle list store. + + Future improvements: + - When messages are inserted other than at the top, all intermediate hashes + change. Keep track of the mapping from old hashes to new hashes so that get + requests can work even for hashes that are not valid anymore. + - group items in "pages" (bigger bundles) + """ + use GenServer - def start_link(cmp) do - GenServer.start_link(__MODULE__, cmp) - end + @doc """ + Start a Merkle List storage process. - defp term_hash(term) do - :crypto.hash(:sha256, (:erlang.term_to_binary term)) - end - @doc """ - Initialize a Merkle List storage. `cmp` is a function that compares stored items and provides a total order. - It must return: - `:after` if the first argument is more recent - `:duplicate` if the two items are the same - `:before` if the first argument is older """ + def start_link(cmp) do + GenServer.start_link(__MODULE__, cmp) + end + def init(cmp) do root_item = :root - root_hash = term_hash root_item + root_hash = SData.term_hash root_item state = %{ root: root_hash, top: root_hash, @@ -32,7 +39,7 @@ defmodule SData.MerkleList do defp state_push(item, state) do new_item = {item, state.top} - new_item_hash = term_hash new_item + new_item_hash = SData.term_hash new_item new_store = Map.put(state.store, new_item_hash, new_item) %{ state | :top => new_item_hash, :store => new_store } end |