diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/app/chat.ex | 21 | ||||
-rw-r--r-- | lib/application.ex | 33 | ||||
-rw-r--r-- | lib/cli/cli.ex | 30 | ||||
-rw-r--r-- | lib/data/merklelist.ex | 136 | ||||
-rw-r--r-- | lib/identity.ex | 32 | ||||
-rw-r--r-- | lib/net/tcpconn.ex | 134 | ||||
-rw-r--r-- | lib/net/tcpserver.ex | 35 | ||||
-rw-r--r-- | lib/web/httprouter.ex | 53 |
8 files changed, 474 insertions, 0 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex new file mode 100644 index 0000000..4a56085 --- /dev/null +++ b/lib/app/chat.ex @@ -0,0 +1,21 @@ +defmodule SApp.Chat do + def send(msg) do + msgitem = {(System.os_time :seconds), + Shard.Identity.get_nickname(), + msg} + GenServer.cast(SApp.Chat.Log, {:insert, msgitem}) + + SNet.ConnSupervisor + |> DynamicSupervisor.which_children + |> Enum.each(fn {_, pid, _, _} -> GenServer.cast(pid, :init_push) end) + end + + def msg_callback({ts, nick, msg}) do + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" + end + + def msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do + SData.MerkleList.cmp_ts_str({ts1, nick1<>"|"<>msg1}, + {ts2, nick2<>"|"<>msg2}) + end +end diff --git a/lib/application.ex b/lib/application.ex new file mode 100644 index 0000000..a199e6c --- /dev/null +++ b/lib/application.ex @@ -0,0 +1,33 @@ +defmodule Shard.Application do + @moduledoc """ + Documentation for Shard. + """ + + use Application + + def start(_type, _args) do + import Supervisor.Spec, warn: false + + {listen_port, _} = Integer.parse ((System.get_env "PORT") || "4044") + + # Define workers and child supervisors to be supervised + children = [ + Shard.Identity, + + # Networking + { DynamicSupervisor, strategy: :one_for_one, name: SNet.ConnSupervisor }, + { SNet.TCPServer, listen_port }, + + # Applications & data store + { SData.MerkleList, [&SApp.Chat.msg_cmp/2, name: SApp.Chat.Log] }, + + # Web UI + Plug.Adapters.Cowboy.child_spec(:http, SWeb.HTTPRouter, [], port: listen_port + 1000) + ] + + # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html + # for other strategies and supported options + opts = [strategy: :one_for_one, name: Shard.Supervisor] + Supervisor.start_link(children, opts) + end +end diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex new file mode 100644 index 0000000..4643351 --- /dev/null +++ b/lib/cli/cli.ex @@ -0,0 +1,30 @@ +defmodule SCLI do + def run() do + str = "say: " |> IO.gets |> String.trim + cond do + str == "/quit" -> + nil + String.slice(str, 0..0) == "/" -> + command = str |> String.slice(1..-1) |> String.split(" ") + handle_command(command) + run() + true -> + SApp.Chat.send(str) + run() + end + end + + def handle_command(["connect", ipstr, portstr]) do + {:ok, ip} = :inet.parse_address (to_charlist ipstr) + {port, _} = Integer.parse portstr + SNet.TCPServer.add_peer(ip, port) + end + + def handle_command(["nick", nick]) do + Shard.Identity.set_nickname nick + end + + def handle_command(_cmd) do + IO.puts "Invalid command" + end +end diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex new file mode 100644 index 0000000..c9e27f6 --- /dev/null +++ b/lib/data/merklelist.ex @@ -0,0 +1,136 @@ +defmodule SData.MerkleList do + use GenServer + + def start_link([cmp, name: name]) do + GenServer.start_link(__MODULE__, cmp, [name: name]) + end + + defp term_hash(term) do + :crypto.hash(:sha256, (:erlang.term_to_binary term)) + end + + @doc """ + Initialize a Merkle List storage. + `cmp` is a function that compares stored items and provides a total order. + + It must return: + - `:after` if the first argument is more recent + - `:duplicate` if the two items are the same + - `:before` if the first argument is older + """ + def init(cmp) do + root_item = :root + root_hash = term_hash root_item + state = %{ + root: root_hash, + top: root_hash, + cmp: cmp, + store: %{ root_hash => root_item } + } + {:ok, state} + end + + defp state_push(item, state) do + new_item = {item, state.top} + new_item_hash = term_hash new_item + new_store = Map.put(state.store, new_item_hash, new_item) + %{ state | :top => new_item_hash, :store => new_store } + end + + defp state_pop(state) do + if state.top == state.root do + :error + else + {item, next} = Map.get(state.store, state.top) + new_store = Map.delete(state.store, state.top) + new_state = %{ state | :top => next, :store => new_store } + {:ok, item, new_state} + end + end + + defp insert_many(state, [], _callback) do + state + end + + defp insert_many(state, [item | rest], callback) do + case state_pop(state) do + :error -> + new_state = state_push(item, insert_many(state, rest, callback)) + callback.(item) + new_state + {:ok, front, state_rest} -> + case state.cmp.(item, front) do + :after -> + new_state = state_push(item, insert_many(state, rest, callback)) + callback.(item) + new_state + :duplicate -> insert_many(state, rest, callback) + :before -> state_push(front, insert_many(state_rest, [item | rest], callback)) + end + end + end + + def handle_cast({:insert, item}, state) do + handle_cast({:insert_many, [item]}, state) + end + + def handle_cast({:insert_many, items}, state) do + handle_cast({:insert_many, items, fn _ -> nil end}, state) + end + + def handle_cast({:insert_many, items, callback}, state) do + items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end) + new_state = insert_many(state, items_sorted, callback) + {:noreply, new_state} + end + + def handle_call({:read, qbegin, qlimit}, _from, state) do + begin = qbegin || state.top + limit = qlimit || 20 + items = get_items_list(state, begin, limit) + {:reply, items, state} + end + + def handle_call(:top, _from, state) do + {:reply, state.top, state} + end + + def handle_call(:root, _from, state) do + {:reply, state.root, state} + end + + def handle_call({:has, hash}, _from, state) do + {:reply, Map.has_key?(state.store, hash), state} + end + + defp get_items_list(state, begin, limit) do + case limit do + 0 -> {:ok, [], begin} + _ -> + case Map.fetch(state.store, begin) do + {:ok, :root} -> + {:ok, [], nil } + {:ok, {item, next}} -> + case get_items_list(state, next, limit - 1) do + {:ok, rest, past} -> + {:ok, [ item | rest ], past } + {:error, reason} -> {:error, reason} + end + :error -> {:error, begin} + end + end + end + + @doc """ + Compare function for timestamped strings + """ + def cmp_ts_str({ts1, str1}, {ts2, str2}) do + cond do + ts1 > ts2 -> :after + ts1 < ts2 -> :before + str1 == str2 -> :duplicate + str1 > str2 -> :after + str1 < str2 -> :before + end + end +end diff --git a/lib/identity.ex b/lib/identity.ex new file mode 100644 index 0000000..229d6c7 --- /dev/null +++ b/lib/identity.ex @@ -0,0 +1,32 @@ +defmodule Shard.Identity do + use Agent + require Salty.Sign.Ed25519, as: Sign + + def start_link(_) do + Agent.start_link(__MODULE__, :init, [], name: __MODULE__) + end + + def init() do + {:ok, pk, sk} = Sign.keypair + nick_suffix = pk + |> binary_part(0, 3) + |> Base.encode16 + |> String.downcase + %{ + keypair: {pk, sk}, + nickname: "Anon" <> nick_suffix, + } + end + + def get_keypair() do + Agent.get(__MODULE__, &(&1.keypair)) + end + + def get_nickname() do + Agent.get(__MODULE__, &(&1.nickname)) + end + + def set_nickname(newnick) do + Agent.update(__MODULE__, &(%{&1 | nickname: newnick})) + end +end diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex new file mode 100644 index 0000000..5d6c912 --- /dev/null +++ b/lib/net/tcpconn.ex @@ -0,0 +1,134 @@ +defmodule SNet.TCPConn do + use GenServer, restart: :temporary + require Salty.Box.Curve25519xchacha20poly1305, as: Box + require Salty.Sign.Ed25519, as: Sign + require Logger + + def start_link(state) do + GenServer.start_link(__MODULE__, state) + end + + def init(state) do + GenServer.cast(self(), :handshake) + {:ok, state} + end + + def handle_call(:get_host_str, _from, state) do + {:reply, "#{state.his_pkey|>Base.encode16|>String.downcase}@#{to_string(:inet_parse.ntoa(state.addr))}:#{state.port}", state} + end + + def handle_cast(:handshake, state) do + socket = state.socket + + {srv_pkey, srv_skey} = Shard.Identity.get_keypair + {:ok, sess_pkey, sess_skey} = Box.keypair + {:ok, challenge} = Salty.Random.buf 32 + + # Exchange public keys and challenge + :gen_tcp.send(socket, srv_pkey <> sess_pkey <> challenge) + {:ok, pkt} = :gen_tcp.recv(socket, 0) + cli_pkey = binary_part(pkt, 0, Sign.publickeybytes) + cli_sess_pkey = binary_part(pkt, Sign.publickeybytes, Box.publickeybytes) + cli_challenge = binary_part(pkt, Sign.publickeybytes + Box.publickeybytes, 32) + + # Do challenge and check their challenge + {:ok, cli_challenge_sign} = Sign.sign_detached(cli_challenge, srv_skey) + pkt = encode_pkt(cli_challenge_sign, cli_sess_pkey, sess_skey) + :gen_tcp.send(socket, pkt) + + {:ok, pkt} = :gen_tcp.recv(socket, 0) + challenge_sign = decode_pkt(pkt, cli_sess_pkey, sess_skey) + :ok = Sign.verify_detached(challenge_sign, challenge, cli_pkey) + + # Connected + :inet.setopts(socket, [active: true]) + + {:ok, {addr, port}} = :inet.peername socket + state =%{ socket: socket, + my_pkey: srv_pkey, + my_skey: srv_skey, + his_pkey: cli_pkey, + conn_my_pkey: sess_pkey, + conn_my_skey: sess_skey, + conn_his_pkey: cli_sess_pkey, + addr: addr, + port: port + } + Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" + + GenServer.cast(self(), :init_push) + + {:noreply, state} + end + + def handle_cast({:send_msg, msg}, state) do + send_msg(state, msg) + {:noreply, state} + end + + def handle_cast(:init_push, state) do + push_messages(state, nil, 10) + {:noreply, state} + end + + defp encode_pkt(pkt, pk, sk) do + {:ok, n} = Salty.Random.buf Box.noncebytes + {:ok, msg} = Box.easy(pkt, n, pk, sk) + n <> msg + end + + defp decode_pkt(pkt, pk, sk) do + n = binary_part(pkt, 0, Box.noncebytes) + enc = binary_part(pkt, Box.noncebytes, (byte_size pkt) - Box.noncebytes) + {:ok, msg} = Box.open_easy(enc, n, pk, sk) + msg + end + + defp send_msg(state, msg) do + msgbin = :erlang.term_to_binary msg + enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey) + :gen_tcp.send(state.socket, enc) + end + + def handle_info({:tcp, _socket, raw_data}, state) do + msg = decode_pkt(raw_data, state.conn_his_pkey, state.conn_my_skey) + handle_packet(:erlang.binary_to_term(msg, [:safe]), state) + {:noreply, state} + end + + def handle_info({:tcp_closed, _socket}, state) do + Logger.info "Disconnected: #{print_id state} at #{inspect state.addr}:#{state.port}" + exit(:normal) + end + + defp push_messages(state, start, num) do + case GenServer.call(SApp.Chat.Log, {:read, start, num}) do + {:ok, list, rest} -> + send_msg(state, {:info, start, list, rest}) + _ -> nil + end + end + + defp handle_packet(msg, state) do + # Logger.info "Message: #{inspect msg}" + case msg do + :get_top -> push_messages(state, nil, 10) + {:get, start} -> push_messages(state, start, 20) + {:info, _start, list, rest} -> + if rest != nil and not GenServer.call(SApp.Chat.Log, {:has, rest}) do + send_msg(state, {:get, rest}) + end + spawn_link(fn -> + Process.sleep 1000 + GenServer.cast(SApp.Chat.Log, {:insert_many, list, &SApp.Chat.msg_callback/1}) + end) + end + end + + defp print_id(state) do + state.his_pkey + |> binary_part(0, 8) + |> Base.encode16 + |> String.downcase + end +end diff --git a/lib/net/tcpserver.ex b/lib/net/tcpserver.ex new file mode 100644 index 0000000..e5ee996 --- /dev/null +++ b/lib/net/tcpserver.ex @@ -0,0 +1,35 @@ +defmodule SNet.TCPServer do + require Logger + use Task, restart: :permanent + + def start_link(port) do + Task.start_link(__MODULE__, :accept, [port]) + end + + + @doc """ + Starts accepting connections on the given `port`. + """ + def accept(port) do + {:ok, socket} = :gen_tcp.listen(port, + [:binary, packet: 2, active: false, reuseaddr: true]) + Logger.info "Accepting connections on port #{port}" + loop_acceptor(socket) + end + + defp loop_acceptor(socket) do + {:ok, client} = :gen_tcp.accept(socket) + {:ok, pid} = DynamicSupervisor.start_child(SNet.ConnSupervisor, {SNet.TCPConn, %{socket: client}}) + :ok = :gen_tcp.controlling_process(client, pid) + loop_acceptor(socket) + end + + def add_peer(ip, port) do + {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) + {:ok, pid} = DynamicSupervisor.start_child(SNet.ConnSupervisor, {SNet.TCPConn, %{socket: client}}) + :ok = :gen_tcp.controlling_process(client, pid) + pid + end + +end + diff --git a/lib/web/httprouter.ex b/lib/web/httprouter.ex new file mode 100644 index 0000000..5027df4 --- /dev/null +++ b/lib/web/httprouter.ex @@ -0,0 +1,53 @@ +defmodule SWeb.HTTPRouter do + use Plug.Router + use Plug.ErrorHandler + + plug Plug.Parsers, parsers: [:urlencoded, :multipart] + + plug :match + plug :dispatch + + get "/" do + main_page(conn) + end + + post "/" do + if Map.has_key?(conn.params, "msg") do + SApp.Chat.send(conn.params["msg"]) + end + if Map.has_key?(conn.params, "nick") do + Shard.Identity.set_nickname(conn.params["nick"]) + end + if Map.has_key?(conn.params, "peer") do + [ipstr, portstr] = String.split(conn.params["peer"], ":") + {:ok, ip} = :inet.parse_address (to_charlist ipstr) + {port, _} = Integer.parse portstr + SNet.TCPServer.add_peer(ip, port) + end + + main_page(conn) + end + + match _ do + send_resp(conn, 404, "Oops!") + end + + def main_page(conn) do + {:ok, messages, _} = GenServer.call(SApp.Chat.Log, {:read, nil, 42}) + + msgtxt = messages + |> Enum.map(fn {ts, nick, msg} -> "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}\n" end) + + peerlist = SNet.ConnSupervisor + |> DynamicSupervisor.which_children + |> Enum.map(fn {_, pid, _, _} -> "#{GenServer.call(pid, :get_host_str)}\n" end) + + conn + |> put_resp_content_type("text/html") + |> send_resp(200, "<pre>#{msgtxt}</pre>" <> + "<form method=POST><input type=text name=msg /><input type=submit value=send /></form>" <> + "<form method=POST><input type=text name=nick value=\"#{Shard.Identity.get_nickname}\" /><input type=submit value=\"change nick\" /></form>" <> + "<hr/><pre>#{peerlist}</pre>" <> + "<form method=POST><input type=text name=peer /><input type=submit value=\"add peer\" /></form>") + end +end |