aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/app/chat.ex21
-rw-r--r--lib/application.ex33
-rw-r--r--lib/cli/cli.ex30
-rw-r--r--lib/data/merklelist.ex136
-rw-r--r--lib/identity.ex32
-rw-r--r--lib/net/tcpconn.ex134
-rw-r--r--lib/net/tcpserver.ex35
-rw-r--r--lib/web/httprouter.ex53
8 files changed, 474 insertions, 0 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
new file mode 100644
index 0000000..4a56085
--- /dev/null
+++ b/lib/app/chat.ex
@@ -0,0 +1,21 @@
+defmodule SApp.Chat do
+ def send(msg) do
+ msgitem = {(System.os_time :seconds),
+ Shard.Identity.get_nickname(),
+ msg}
+ GenServer.cast(SApp.Chat.Log, {:insert, msgitem})
+
+ SNet.ConnSupervisor
+ |> DynamicSupervisor.which_children
+ |> Enum.each(fn {_, pid, _, _} -> GenServer.cast(pid, :init_push) end)
+ end
+
+ def msg_callback({ts, nick, msg}) do
+ IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}"
+ end
+
+ def msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
+ SData.MerkleList.cmp_ts_str({ts1, nick1<>"|"<>msg1},
+ {ts2, nick2<>"|"<>msg2})
+ end
+end
diff --git a/lib/application.ex b/lib/application.ex
new file mode 100644
index 0000000..a199e6c
--- /dev/null
+++ b/lib/application.ex
@@ -0,0 +1,33 @@
+defmodule Shard.Application do
+ @moduledoc """
+ Documentation for Shard.
+ """
+
+ use Application
+
+ def start(_type, _args) do
+ import Supervisor.Spec, warn: false
+
+ {listen_port, _} = Integer.parse ((System.get_env "PORT") || "4044")
+
+ # Define workers and child supervisors to be supervised
+ children = [
+ Shard.Identity,
+
+ # Networking
+ { DynamicSupervisor, strategy: :one_for_one, name: SNet.ConnSupervisor },
+ { SNet.TCPServer, listen_port },
+
+ # Applications & data store
+ { SData.MerkleList, [&SApp.Chat.msg_cmp/2, name: SApp.Chat.Log] },
+
+ # Web UI
+ Plug.Adapters.Cowboy.child_spec(:http, SWeb.HTTPRouter, [], port: listen_port + 1000)
+ ]
+
+ # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
+ # for other strategies and supported options
+ opts = [strategy: :one_for_one, name: Shard.Supervisor]
+ Supervisor.start_link(children, opts)
+ end
+end
diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex
new file mode 100644
index 0000000..4643351
--- /dev/null
+++ b/lib/cli/cli.ex
@@ -0,0 +1,30 @@
+defmodule SCLI do
+ def run() do
+ str = "say: " |> IO.gets |> String.trim
+ cond do
+ str == "/quit" ->
+ nil
+ String.slice(str, 0..0) == "/" ->
+ command = str |> String.slice(1..-1) |> String.split(" ")
+ handle_command(command)
+ run()
+ true ->
+ SApp.Chat.send(str)
+ run()
+ end
+ end
+
+ def handle_command(["connect", ipstr, portstr]) do
+ {:ok, ip} = :inet.parse_address (to_charlist ipstr)
+ {port, _} = Integer.parse portstr
+ SNet.TCPServer.add_peer(ip, port)
+ end
+
+ def handle_command(["nick", nick]) do
+ Shard.Identity.set_nickname nick
+ end
+
+ def handle_command(_cmd) do
+ IO.puts "Invalid command"
+ end
+end
diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex
new file mode 100644
index 0000000..c9e27f6
--- /dev/null
+++ b/lib/data/merklelist.ex
@@ -0,0 +1,136 @@
+defmodule SData.MerkleList do
+ use GenServer
+
+ def start_link([cmp, name: name]) do
+ GenServer.start_link(__MODULE__, cmp, [name: name])
+ end
+
+ defp term_hash(term) do
+ :crypto.hash(:sha256, (:erlang.term_to_binary term))
+ end
+
+ @doc """
+ Initialize a Merkle List storage.
+ `cmp` is a function that compares stored items and provides a total order.
+
+ It must return:
+ - `:after` if the first argument is more recent
+ - `:duplicate` if the two items are the same
+ - `:before` if the first argument is older
+ """
+ def init(cmp) do
+ root_item = :root
+ root_hash = term_hash root_item
+ state = %{
+ root: root_hash,
+ top: root_hash,
+ cmp: cmp,
+ store: %{ root_hash => root_item }
+ }
+ {:ok, state}
+ end
+
+ defp state_push(item, state) do
+ new_item = {item, state.top}
+ new_item_hash = term_hash new_item
+ new_store = Map.put(state.store, new_item_hash, new_item)
+ %{ state | :top => new_item_hash, :store => new_store }
+ end
+
+ defp state_pop(state) do
+ if state.top == state.root do
+ :error
+ else
+ {item, next} = Map.get(state.store, state.top)
+ new_store = Map.delete(state.store, state.top)
+ new_state = %{ state | :top => next, :store => new_store }
+ {:ok, item, new_state}
+ end
+ end
+
+ defp insert_many(state, [], _callback) do
+ state
+ end
+
+ defp insert_many(state, [item | rest], callback) do
+ case state_pop(state) do
+ :error ->
+ new_state = state_push(item, insert_many(state, rest, callback))
+ callback.(item)
+ new_state
+ {:ok, front, state_rest} ->
+ case state.cmp.(item, front) do
+ :after ->
+ new_state = state_push(item, insert_many(state, rest, callback))
+ callback.(item)
+ new_state
+ :duplicate -> insert_many(state, rest, callback)
+ :before -> state_push(front, insert_many(state_rest, [item | rest], callback))
+ end
+ end
+ end
+
+ def handle_cast({:insert, item}, state) do
+ handle_cast({:insert_many, [item]}, state)
+ end
+
+ def handle_cast({:insert_many, items}, state) do
+ handle_cast({:insert_many, items, fn _ -> nil end}, state)
+ end
+
+ def handle_cast({:insert_many, items, callback}, state) do
+ items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end)
+ new_state = insert_many(state, items_sorted, callback)
+ {:noreply, new_state}
+ end
+
+ def handle_call({:read, qbegin, qlimit}, _from, state) do
+ begin = qbegin || state.top
+ limit = qlimit || 20
+ items = get_items_list(state, begin, limit)
+ {:reply, items, state}
+ end
+
+ def handle_call(:top, _from, state) do
+ {:reply, state.top, state}
+ end
+
+ def handle_call(:root, _from, state) do
+ {:reply, state.root, state}
+ end
+
+ def handle_call({:has, hash}, _from, state) do
+ {:reply, Map.has_key?(state.store, hash), state}
+ end
+
+ defp get_items_list(state, begin, limit) do
+ case limit do
+ 0 -> {:ok, [], begin}
+ _ ->
+ case Map.fetch(state.store, begin) do
+ {:ok, :root} ->
+ {:ok, [], nil }
+ {:ok, {item, next}} ->
+ case get_items_list(state, next, limit - 1) do
+ {:ok, rest, past} ->
+ {:ok, [ item | rest ], past }
+ {:error, reason} -> {:error, reason}
+ end
+ :error -> {:error, begin}
+ end
+ end
+ end
+
+ @doc """
+ Compare function for timestamped strings
+ """
+ def cmp_ts_str({ts1, str1}, {ts2, str2}) do
+ cond do
+ ts1 > ts2 -> :after
+ ts1 < ts2 -> :before
+ str1 == str2 -> :duplicate
+ str1 > str2 -> :after
+ str1 < str2 -> :before
+ end
+ end
+end
diff --git a/lib/identity.ex b/lib/identity.ex
new file mode 100644
index 0000000..229d6c7
--- /dev/null
+++ b/lib/identity.ex
@@ -0,0 +1,32 @@
+defmodule Shard.Identity do
+ use Agent
+ require Salty.Sign.Ed25519, as: Sign
+
+ def start_link(_) do
+ Agent.start_link(__MODULE__, :init, [], name: __MODULE__)
+ end
+
+ def init() do
+ {:ok, pk, sk} = Sign.keypair
+ nick_suffix = pk
+ |> binary_part(0, 3)
+ |> Base.encode16
+ |> String.downcase
+ %{
+ keypair: {pk, sk},
+ nickname: "Anon" <> nick_suffix,
+ }
+ end
+
+ def get_keypair() do
+ Agent.get(__MODULE__, &(&1.keypair))
+ end
+
+ def get_nickname() do
+ Agent.get(__MODULE__, &(&1.nickname))
+ end
+
+ def set_nickname(newnick) do
+ Agent.update(__MODULE__, &(%{&1 | nickname: newnick}))
+ end
+end
diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex
new file mode 100644
index 0000000..5d6c912
--- /dev/null
+++ b/lib/net/tcpconn.ex
@@ -0,0 +1,134 @@
+defmodule SNet.TCPConn do
+ use GenServer, restart: :temporary
+ require Salty.Box.Curve25519xchacha20poly1305, as: Box
+ require Salty.Sign.Ed25519, as: Sign
+ require Logger
+
+ def start_link(state) do
+ GenServer.start_link(__MODULE__, state)
+ end
+
+ def init(state) do
+ GenServer.cast(self(), :handshake)
+ {:ok, state}
+ end
+
+ def handle_call(:get_host_str, _from, state) do
+ {:reply, "#{state.his_pkey|>Base.encode16|>String.downcase}@#{to_string(:inet_parse.ntoa(state.addr))}:#{state.port}", state}
+ end
+
+ def handle_cast(:handshake, state) do
+ socket = state.socket
+
+ {srv_pkey, srv_skey} = Shard.Identity.get_keypair
+ {:ok, sess_pkey, sess_skey} = Box.keypair
+ {:ok, challenge} = Salty.Random.buf 32
+
+ # Exchange public keys and challenge
+ :gen_tcp.send(socket, srv_pkey <> sess_pkey <> challenge)
+ {:ok, pkt} = :gen_tcp.recv(socket, 0)
+ cli_pkey = binary_part(pkt, 0, Sign.publickeybytes)
+ cli_sess_pkey = binary_part(pkt, Sign.publickeybytes, Box.publickeybytes)
+ cli_challenge = binary_part(pkt, Sign.publickeybytes + Box.publickeybytes, 32)
+
+ # Do challenge and check their challenge
+ {:ok, cli_challenge_sign} = Sign.sign_detached(cli_challenge, srv_skey)
+ pkt = encode_pkt(cli_challenge_sign, cli_sess_pkey, sess_skey)
+ :gen_tcp.send(socket, pkt)
+
+ {:ok, pkt} = :gen_tcp.recv(socket, 0)
+ challenge_sign = decode_pkt(pkt, cli_sess_pkey, sess_skey)
+ :ok = Sign.verify_detached(challenge_sign, challenge, cli_pkey)
+
+ # Connected
+ :inet.setopts(socket, [active: true])
+
+ {:ok, {addr, port}} = :inet.peername socket
+ state =%{ socket: socket,
+ my_pkey: srv_pkey,
+ my_skey: srv_skey,
+ his_pkey: cli_pkey,
+ conn_my_pkey: sess_pkey,
+ conn_my_skey: sess_skey,
+ conn_his_pkey: cli_sess_pkey,
+ addr: addr,
+ port: port
+ }
+ Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
+
+ GenServer.cast(self(), :init_push)
+
+ {:noreply, state}
+ end
+
+ def handle_cast({:send_msg, msg}, state) do
+ send_msg(state, msg)
+ {:noreply, state}
+ end
+
+ def handle_cast(:init_push, state) do
+ push_messages(state, nil, 10)
+ {:noreply, state}
+ end
+
+ defp encode_pkt(pkt, pk, sk) do
+ {:ok, n} = Salty.Random.buf Box.noncebytes
+ {:ok, msg} = Box.easy(pkt, n, pk, sk)
+ n <> msg
+ end
+
+ defp decode_pkt(pkt, pk, sk) do
+ n = binary_part(pkt, 0, Box.noncebytes)
+ enc = binary_part(pkt, Box.noncebytes, (byte_size pkt) - Box.noncebytes)
+ {:ok, msg} = Box.open_easy(enc, n, pk, sk)
+ msg
+ end
+
+ defp send_msg(state, msg) do
+ msgbin = :erlang.term_to_binary msg
+ enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey)
+ :gen_tcp.send(state.socket, enc)
+ end
+
+ def handle_info({:tcp, _socket, raw_data}, state) do
+ msg = decode_pkt(raw_data, state.conn_his_pkey, state.conn_my_skey)
+ handle_packet(:erlang.binary_to_term(msg, [:safe]), state)
+ {:noreply, state}
+ end
+
+ def handle_info({:tcp_closed, _socket}, state) do
+ Logger.info "Disconnected: #{print_id state} at #{inspect state.addr}:#{state.port}"
+ exit(:normal)
+ end
+
+ defp push_messages(state, start, num) do
+ case GenServer.call(SApp.Chat.Log, {:read, start, num}) do
+ {:ok, list, rest} ->
+ send_msg(state, {:info, start, list, rest})
+ _ -> nil
+ end
+ end
+
+ defp handle_packet(msg, state) do
+ # Logger.info "Message: #{inspect msg}"
+ case msg do
+ :get_top -> push_messages(state, nil, 10)
+ {:get, start} -> push_messages(state, start, 20)
+ {:info, _start, list, rest} ->
+ if rest != nil and not GenServer.call(SApp.Chat.Log, {:has, rest}) do
+ send_msg(state, {:get, rest})
+ end
+ spawn_link(fn ->
+ Process.sleep 1000
+ GenServer.cast(SApp.Chat.Log, {:insert_many, list, &SApp.Chat.msg_callback/1})
+ end)
+ end
+ end
+
+ defp print_id(state) do
+ state.his_pkey
+ |> binary_part(0, 8)
+ |> Base.encode16
+ |> String.downcase
+ end
+end
diff --git a/lib/net/tcpserver.ex b/lib/net/tcpserver.ex
new file mode 100644
index 0000000..e5ee996
--- /dev/null
+++ b/lib/net/tcpserver.ex
@@ -0,0 +1,35 @@
+defmodule SNet.TCPServer do
+ require Logger
+ use Task, restart: :permanent
+
+ def start_link(port) do
+ Task.start_link(__MODULE__, :accept, [port])
+ end
+
+
+ @doc """
+ Starts accepting connections on the given `port`.
+ """
+ def accept(port) do
+ {:ok, socket} = :gen_tcp.listen(port,
+ [:binary, packet: 2, active: false, reuseaddr: true])
+ Logger.info "Accepting connections on port #{port}"
+ loop_acceptor(socket)
+ end
+
+ defp loop_acceptor(socket) do
+ {:ok, client} = :gen_tcp.accept(socket)
+ {:ok, pid} = DynamicSupervisor.start_child(SNet.ConnSupervisor, {SNet.TCPConn, %{socket: client}})
+ :ok = :gen_tcp.controlling_process(client, pid)
+ loop_acceptor(socket)
+ end
+
+ def add_peer(ip, port) do
+ {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
+ {:ok, pid} = DynamicSupervisor.start_child(SNet.ConnSupervisor, {SNet.TCPConn, %{socket: client}})
+ :ok = :gen_tcp.controlling_process(client, pid)
+ pid
+ end
+
+end
+
diff --git a/lib/web/httprouter.ex b/lib/web/httprouter.ex
new file mode 100644
index 0000000..5027df4
--- /dev/null
+++ b/lib/web/httprouter.ex
@@ -0,0 +1,53 @@
+defmodule SWeb.HTTPRouter do
+ use Plug.Router
+ use Plug.ErrorHandler
+
+ plug Plug.Parsers, parsers: [:urlencoded, :multipart]
+
+ plug :match
+ plug :dispatch
+
+ get "/" do
+ main_page(conn)
+ end
+
+ post "/" do
+ if Map.has_key?(conn.params, "msg") do
+ SApp.Chat.send(conn.params["msg"])
+ end
+ if Map.has_key?(conn.params, "nick") do
+ Shard.Identity.set_nickname(conn.params["nick"])
+ end
+ if Map.has_key?(conn.params, "peer") do
+ [ipstr, portstr] = String.split(conn.params["peer"], ":")
+ {:ok, ip} = :inet.parse_address (to_charlist ipstr)
+ {port, _} = Integer.parse portstr
+ SNet.TCPServer.add_peer(ip, port)
+ end
+
+ main_page(conn)
+ end
+
+ match _ do
+ send_resp(conn, 404, "Oops!")
+ end
+
+ def main_page(conn) do
+ {:ok, messages, _} = GenServer.call(SApp.Chat.Log, {:read, nil, 42})
+
+ msgtxt = messages
+ |> Enum.map(fn {ts, nick, msg} -> "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} &lt;#{nick}&gt; #{msg}\n" end)
+
+ peerlist = SNet.ConnSupervisor
+ |> DynamicSupervisor.which_children
+ |> Enum.map(fn {_, pid, _, _} -> "#{GenServer.call(pid, :get_host_str)}\n" end)
+
+ conn
+ |> put_resp_content_type("text/html")
+ |> send_resp(200, "<pre>#{msgtxt}</pre>" <>
+ "<form method=POST><input type=text name=msg /><input type=submit value=send /></form>" <>
+ "<form method=POST><input type=text name=nick value=\"#{Shard.Identity.get_nickname}\" /><input type=submit value=\"change nick\" /></form>" <>
+ "<hr/><pre>#{peerlist}</pre>" <>
+ "<form method=POST><input type=text name=peer /><input type=submit value=\"add peer\" /></form>")
+ end
+end