aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/app/chat.ex62
-rw-r--r--lib/application.ex5
-rw-r--r--lib/cli/cli.ex14
-rw-r--r--lib/data/data.ex14
-rw-r--r--lib/data/merklelist.ex29
5 files changed, 102 insertions, 22 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
index f165514..e86f739 100644
--- a/lib/app/chat.ex
+++ b/lib/app/chat.ex
@@ -1,14 +1,37 @@
defmodule SApp.Chat do
+ @moduledoc """
+ Shard application for a replicated chat room with full history.
+
+ Chat rooms are globally identified by their channel name.
+ A chat room manifest is of the form:
+
+ {:chat, channel_name}
+
+ Future improvements:
+ - message signing
+ - storage of the chatroom messages to disk
+ - storage of the known peers that have this channel to disk
+ - use a DHT to find peers that are interested in this channel
+ - epidemic broadcast (carefull not to be too costly,
+ maybe by limiting the number of peers we talk to)
+ """
+
use GenServer
+ @doc """
+ Start a process that connects to a given channel
+ """
def start_link(channel) do
GenServer.start_link(__MODULE__, channel)
end
+ @doc """
+ Initialize channel process.
+ """
def init(channel) do
{:ok, store} = SData.MerkleList.start_link(&msg_cmp/2)
manifest = {:chat, channel}
- id = :crypto.hash(:sha256, :erlang.term_to_binary(manifest))
+ id = SData.term_hash manifest
GenServer.cast(Shard.Manager, {:register, id, self()})
GenServer.cast(self(), :init_pull)
@@ -16,20 +39,37 @@ defmodule SApp.Chat do
{:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: %{}}}
end
+ @doc """
+ Implementation of the :manifest call that returns the chat room's manifest
+ """
def handle_call(:manifest, _from, state) do
{:reply, state.manifest, state}
end
+ @doc """
+ Implementation of the :redundant handler: if another process is already
+ synchronizing this channel then we exit.
+ """
def handle_cast({:redundant, _}, _state) do
exit :normal
end
+ @doc """
+ Implementation of the :init_pull handler, which is called when the
+ process starts. It contacts all currently connected peers and asks them to
+ send data for this channel if they have some.
+ """
def handle_cast(:init_pull, state) do
GenServer.call(SNet.Manager, :get_all)
|> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}})))
{:noreply, state}
end
+ @doc """
+ Implementation of the :chat_send handler. This is the main handler that is used
+ to send a message to the chat room. Puts the message in the store and syncs
+ with all connected peers.
+ """
def handle_cast({:chat_send, msg}, state) do
msgitem = {(System.os_time :seconds),
Shard.Identity.get_nickname(),
@@ -43,15 +83,27 @@ defmodule SApp.Chat do
{:noreply, state}
end
+ @doc """
+ Implementation of the :interested handler, this is called when a peer we are
+ connected to asks to recieve data for this channel.
+ """
def handle_cast({:interested, peer_id, peer_pid}, state) do
push_messages(state, peer_pid, nil, 10)
new_peers = Map.put(state.peers, peer_id, peer_pid)
{:noreply, %{ state | peers: new_peers }}
end
+ @doc """
+ Implementation of the :msg handler, which is the main handler for messages
+ comming from other peers concerning this chat room.
+
+ Messages are:
+ - `{:get, start}`: get some messages starting at a given Merkle hash
+ - `{:info, start, list, rest}`: put some messages and informs of the
+ Merkle hash of the store of older messages.
+ """
def handle_cast({:msg, peer_id, peer_pid, msg}, state) do
case msg do
- :get_top -> push_messages(peer_id, state, nil, 10)
{:get, start} -> push_messages(peer_id, state, start, 20)
{:info, _start, list, rest} ->
if rest != nil and not GenServer.call(state.store, {:has, rest}) do
@@ -70,6 +122,7 @@ defmodule SApp.Chat do
end
end
+
defp push_messages(state, to, start, num) do
case GenServer.call(state.store, {:read, start, num}) do
{:ok, list, rest} ->
@@ -78,12 +131,11 @@ defmodule SApp.Chat do
end
end
-
- def msg_callback(chan, {ts, nick, msg}) do
+ defp msg_callback(chan, {ts, nick, msg}) do
IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}"
end
- def msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
+ defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
SData.MerkleList.cmp_ts_str({ts1, nick1<>"|"<>msg1},
{ts2, nick2<>"|"<>msg2})
end
diff --git a/lib/application.ex b/lib/application.ex
index e375cf4..3ba6e12 100644
--- a/lib/application.ex
+++ b/lib/application.ex
@@ -1,6 +1,9 @@
defmodule Shard.Application do
@moduledoc """
- Documentation for Shard.
+ Main Shard application.
+
+ Shard is a prototype peer-to-peer comunication platform with data
+ synchronization.
"""
use Application
diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex
index 0402b3f..d2e5f6a 100644
--- a/lib/cli/cli.ex
+++ b/lib/cli/cli.ex
@@ -1,4 +1,8 @@
defmodule SCLI do
+ @moduledoc """
+ Small command line interface for the chat application
+ """
+
def run() do
run(nil)
end
@@ -26,14 +30,14 @@ defmodule SCLI do
end
end
- def handle_command(pid, ["connect", ipstr, portstr]) do
+ defp handle_command(pid, ["connect", ipstr, portstr]) do
{:ok, ip} = :inet.parse_address (to_charlist ipstr)
{port, _} = Integer.parse portstr
SNet.Manager.add_peer(ip, port)
pid
end
- def handle_command(pid, ["list"]) do
+ defp handle_command(pid, ["list"]) do
IO.puts "List of known channels:"
list = GenServer.call(Shard.Manager, :list)
for {_chid, chpid} <- list do
@@ -43,7 +47,7 @@ defmodule SCLI do
pid
end
- def handle_command(pid, ["join", qchan]) do
+ defp handle_command(pid, ["join", qchan]) do
list = GenServer.call(Shard.Manager, :list)
list = for {_chid, chpid} <- list,
{:chat, chan} = GenServer.call(chpid, :manifest),
@@ -58,12 +62,12 @@ defmodule SCLI do
end
end
- def handle_command(pid, ["nick", nick]) do
+ defp handle_command(pid, ["nick", nick]) do
Shard.Identity.set_nickname nick
pid
end
- def handle_command(pid, _cmd) do
+ defp handle_command(pid, _cmd) do
IO.puts "Invalid command"
pid
end
diff --git a/lib/data/data.ex b/lib/data/data.ex
new file mode 100644
index 0000000..2c6e629
--- /dev/null
+++ b/lib/data/data.ex
@@ -0,0 +1,14 @@
+defmodule SData do
+ @moduledoc """
+ Utility functions
+ """
+
+ @doc """
+ Calculate the hash of an Erlang term by first converting it to its
+ binary representation.
+ """
+ def term_hash(term) do
+ :crypto.hash(:sha256, (:erlang.term_to_binary term))
+ end
+
+end
diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex
index 42b80fc..727f2a8 100644
--- a/lib/data/merklelist.ex
+++ b/lib/data/merklelist.ex
@@ -1,26 +1,33 @@
defmodule SData.MerkleList do
+ @moduledoc """
+ A simple Merkle list store.
+
+ Future improvements:
+ - When messages are inserted other than at the top, all intermediate hashes
+ change. Keep track of the mapping from old hashes to new hashes so that get
+ requests can work even for hashes that are not valid anymore.
+ - group items in "pages" (bigger bundles)
+ """
+
use GenServer
- def start_link(cmp) do
- GenServer.start_link(__MODULE__, cmp)
- end
+ @doc """
+ Start a Merkle List storage process.
- defp term_hash(term) do
- :crypto.hash(:sha256, (:erlang.term_to_binary term))
- end
- @doc """
- Initialize a Merkle List storage.
`cmp` is a function that compares stored items and provides a total order.
-
It must return:
- `:after` if the first argument is more recent
- `:duplicate` if the two items are the same
- `:before` if the first argument is older
"""
+ def start_link(cmp) do
+ GenServer.start_link(__MODULE__, cmp)
+ end
+
def init(cmp) do
root_item = :root
- root_hash = term_hash root_item
+ root_hash = SData.term_hash root_item
state = %{
root: root_hash,
top: root_hash,
@@ -32,7 +39,7 @@ defmodule SData.MerkleList do
defp state_push(item, state) do
new_item = {item, state.top}
- new_item_hash = term_hash new_item
+ new_item_hash = SData.term_hash new_item
new_store = Map.put(state.store, new_item_hash, new_item)
%{ state | :top => new_item_hash, :store => new_store }
end