From c6ec33d6e612168e14d77007915a4ea423c55a2e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 1 Sep 2018 16:06:23 +0200 Subject: Move everything to subdirectory --- .formatter.exs | 4 - .gitignore | 26 --- README.md | 16 -- config/config.exs | 42 ---- lib/app/blockstore.ex | 149 -------------- lib/app/chat.ex | 208 -------------------- lib/application.ex | 33 ---- lib/cli/cli.ex | 98 ---------- lib/data/data.ex | 49 ----- lib/data/merklelist.ex | 143 -------------- lib/data/merklesearchtree.ex | 387 ------------------------------------- lib/data/store.ex | 91 --------- lib/identity.ex | 46 ----- lib/manager.ex | 243 ----------------------- lib/net/tcpconn.ex | 106 ---------- lib/net/tcpserver.ex | 26 --- mix.exs | 33 ---- mix.lock | 20 -- shard/.formatter.exs | 4 + shard/.gitignore | 26 +++ shard/README.md | 16 ++ shard/config/config.exs | 42 ++++ shard/lib/app/blockstore.ex | 149 ++++++++++++++ shard/lib/app/chat.ex | 208 ++++++++++++++++++++ shard/lib/application.ex | 33 ++++ shard/lib/cli/cli.ex | 98 ++++++++++ shard/lib/data/data.ex | 49 +++++ shard/lib/data/merklelist.ex | 143 ++++++++++++++ shard/lib/data/merklesearchtree.ex | 387 +++++++++++++++++++++++++++++++++++++ shard/lib/data/store.ex | 91 +++++++++ shard/lib/identity.ex | 46 +++++ shard/lib/manager.ex | 243 +++++++++++++++++++++++ shard/lib/net/tcpconn.ex | 106 ++++++++++ shard/lib/net/tcpserver.ex | 26 +++ shard/mix.exs | 33 ++++ shard/mix.lock | 20 ++ shard/test/conn_test.exs | 62 ++++++ shard/test/mkllst_test.exs | 18 ++ shard/test/mst_test.exs | 197 +++++++++++++++++++ shard/test/test_helper.exs | 8 + test/conn_test.exs | 62 ------ test/mkllst_test.exs | 18 -- test/mst_test.exs | 197 ------------------- test/test_helper.exs | 8 - 44 files changed, 2005 insertions(+), 2005 deletions(-) delete mode 100644 .formatter.exs delete mode 100644 .gitignore delete mode 100644 README.md delete mode 100644 config/config.exs delete mode 100644 lib/app/blockstore.ex delete mode 100644 lib/app/chat.ex delete mode 100644 lib/application.ex delete mode 100644 lib/cli/cli.ex delete mode 100644 lib/data/data.ex delete mode 100644 lib/data/merklelist.ex delete mode 100644 lib/data/merklesearchtree.ex delete mode 100644 lib/data/store.ex delete mode 100644 lib/identity.ex delete mode 100644 lib/manager.ex delete mode 100644 lib/net/tcpconn.ex delete mode 100644 lib/net/tcpserver.ex delete mode 100644 mix.exs delete mode 100644 mix.lock create mode 100644 shard/.formatter.exs create mode 100644 shard/.gitignore create mode 100644 shard/README.md create mode 100644 shard/config/config.exs create mode 100644 shard/lib/app/blockstore.ex create mode 100644 shard/lib/app/chat.ex create mode 100644 shard/lib/application.ex create mode 100644 shard/lib/cli/cli.ex create mode 100644 shard/lib/data/data.ex create mode 100644 shard/lib/data/merklelist.ex create mode 100644 shard/lib/data/merklesearchtree.ex create mode 100644 shard/lib/data/store.ex create mode 100644 shard/lib/identity.ex create mode 100644 shard/lib/manager.ex create mode 100644 shard/lib/net/tcpconn.ex create mode 100644 shard/lib/net/tcpserver.ex create mode 100644 shard/mix.exs create mode 100644 shard/mix.lock create mode 100644 shard/test/conn_test.exs create mode 100644 shard/test/mkllst_test.exs create mode 100644 shard/test/mst_test.exs create mode 100644 shard/test/test_helper.exs delete mode 100644 test/conn_test.exs delete mode 100644 test/mkllst_test.exs delete mode 100644 test/mst_test.exs delete mode 100644 test/test_helper.exs diff --git a/.formatter.exs b/.formatter.exs deleted file mode 100644 index 525446d..0000000 --- a/.formatter.exs +++ /dev/null @@ -1,4 +0,0 @@ -# Used by "mix format" -[ - inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"] -] diff --git a/.gitignore b/.gitignore deleted file mode 100644 index ed31458..0000000 --- a/.gitignore +++ /dev/null @@ -1,26 +0,0 @@ -# The directory Mix will write compiled artifacts to. -/_build/ - -# If you run "mix test --cover", coverage assets end up here. -/cover/ - -# The directory Mix downloads your dependencies sources to. -/deps/ - -# Where 3rd-party dependencies like ExDoc output generated docs. -/doc/ - -# Ignore .fetch files in case you like to edit your project deps locally. -/.fetch - -# If the VM crashes, it generates a dump, let's ignore it too. -erl_crash.dump - -# Also ignore archive artifacts (built via "mix archive.build"). -*.ez - -# Ignore package tarball (built via "mix hex.build"). -shard-*.tar - -# vim files -*.swp diff --git a/README.md b/README.md deleted file mode 100644 index 7abb1ce..0000000 --- a/README.md +++ /dev/null @@ -1,16 +0,0 @@ -# Shard - -Tests of peer-to-peer stuff. Right now it's a little chat application with replication of message history over peers. Nothing is secure, contrary to the name of the project. - -## Installation - -Download and install [Elixir](https://elixir-lang.org/). - -``` -mix deps.get -mix compile -mix run --no-halt -``` - -P2P port is 4044 by default, can be changed by setting the `$PORT` environment variable. HTTP interface is on port `$PORT`+1000. - diff --git a/config/config.exs b/config/config.exs deleted file mode 100644 index f970078..0000000 --- a/config/config.exs +++ /dev/null @@ -1,42 +0,0 @@ -# This file is responsible for configuring your application -# and its dependencies with the aid of the Mix.Config module. -use Mix.Config - -# This configuration is loaded before any dependency and is restricted -# to this project. If another project depends on this project, this -# file won't be loaded nor affect the parent project. For this reason, -# if you want to provide default values for your application for -# 3rd-party users, it should be done in your "mix.exs" file. - -# You can configure your application as: -# -# config :shard, key: :value -# -# and access this configuration in your application as: -# -# Application.get_env(:shard, :key) -# -# You can also configure a 3rd-party app: -# -# config :logger, level: :info -# - - -# Peer id suffix -# ============== -# This Shard instance will only connect to other instances that use -# the same suffix. -# -# On first run, the instance will try to generate a peer id that -# has this suffix. This is done by brute-force testing, therefore -# it is not recommended to use long suffixes. -config :shard, peer_id_suffix: "S" - - -# It is also possible to import configuration files, relative to this -# directory. For example, you can emulate configuration per environment -# by uncommenting the line below and defining dev.exs, test.exs and such. -# Configuration from the imported file will override the ones defined -# here (which is why it is important to import them last). -# -# import_config "#{Mix.env}.exs" diff --git a/lib/app/blockstore.ex b/lib/app/blockstore.ex deleted file mode 100644 index 8e4fddc..0000000 --- a/lib/app/blockstore.ex +++ /dev/null @@ -1,149 +0,0 @@ -defmodule SApp.BlockStore do - @moduledoc """ - A module that implements a content-adressable storage (blocks, or pages, - identified by the hash of their contents). - - This is not a shard, it is a side process that a shard may use to store its data. - - TODO: WIP - """ - - use GenServer - - @enforce_keys [:pid] - defstruct [:pid, :prefer_ask] - - - defmodule State do - defstruct [:shard_id, :path, :store, :reqs, :retries] - end - - - def start_link(shard_id, path) do - GenServer.start_link(__MODULE__, [shard_id, path]) - end - - def init([shard_id, path]) do - Shard.Manager.dispatch_to(shard_id, path, self()) - - {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}} - end - - def handle_call({:get, key, prefer_ask}, from, state) do - case state.store[key] do - nil -> - case prefer_ask do - [_ | _] -> - for peer <- prefer_ask do - Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) - end - _ -> - ask_random_peers(state, key) - end - reqs_key = case state.reqs[key] do - nil -> - MapSet.put(MapSet.new(), from) - ms -> - MapSet.put(ms, from) - end - state = put_in(state.reqs[key], reqs_key) - {:noreply, state} - v -> - {:reply, v, state} - end - end - - def handle_call({:put, val}, _from, state) do - hash = SData.term_hash val - state = %{state | store: Map.put(state.store, hash, val)} - {:reply, hash, state} - end - - def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do - state = case msg do - {:get, key} -> - case state.store[key] do - nil -> - Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) - v -> - Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}}) - end - state - {:info, hash, value} -> - if SData.term_hash value == hash do - reqs = case state.reqs[hash] do - nil -> state.reqs - pids -> - for pid <- pids do - GenServer.reply(pid, value) - end - Map.delete(state.reqs, hash) - end - state = %{state | retries: Map.delete(state.retries, hash)} - %{state | store: Map.put(state.store, hash, value), reqs: reqs} - else - state - end - {:not_found, key} -> - if state.reqs[key] != nil and state.store[key] == nil do - nretry = case state.retries[key] do - nil -> 1 - n -> n+1 - end - if nretry < 3 do - ask_random_peers(state, key) - %{state | retries: Map.put(state.retries, key, nretry)} - else - for pid <- state.reqs[key] do - GenServer.reply(pid, nil) - end - state = %{state | reqs: Map.delete(state.reqs, key)} - state = %{state | retries: Map.delete(state.retries, key)} - state - end - else - state - end - end - {:noreply, state} - end - - def ask_random_peers(state, key) do - peers = :ets.lookup(:shard_peer_db, state.shard_id) - |> Enum.shuffle - |> Enum.take(3) - for {_, peer} <- peers do - Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) - end - end - - - defimpl SData.PageStore do - def put(store, page) do - hash = GenServer.call(store.pid, {:put, page}) - { hash, store } - end - - def get(store, hash) do - try do - GenServer.call(store.pid, {:get, hash, store.prefer_ask}) - catch - :exit, {:timeout, _} -> nil - end - end - - def copy(store, other_store, hash) do - page = SData.PageStore.get(other_store, hash) - refs = SData.Page.refs(page) - for ref <- refs do - copy(store, other_store, ref) - end - GenServer.call(store.pid, {:put, page}) - store - end - - def free(store, _hash) do - store ## DO SOMETHING??? - end - end -end diff --git a/lib/app/chat.ex b/lib/app/chat.ex deleted file mode 100644 index 051fab6..0000000 --- a/lib/app/chat.ex +++ /dev/null @@ -1,208 +0,0 @@ -defmodule SApp.Chat do - @moduledoc """ - Shard application for a replicated chat room with full history. - - Chat rooms are globally identified by their channel name. - A chat room manifest is of the form: - - {:chat, channel_name} - - Future improvements: - - message signing - - storage of the chatroom messages to disk - - storage of the known peers that have this channel to disk - - use a DHT to find peers that are interested in this channel - - epidemic broadcast (carefull not to be too costly, - maybe by limiting the number of peers we talk to) - - partial synchronization only == data distributed over peers - """ - - use GenServer - - require Logger - alias SData.MerkleSearchTree, as: MST - - @doc """ - Start a process that connects to a given channel - """ - def start_link(channel) do - GenServer.start_link(__MODULE__, channel) - end - - @doc """ - Initialize channel process. - """ - def init(channel) do - manifest = {:chat, channel} - id = SData.term_hash manifest - - case Shard.Manager.register(id, manifest, self()) do - :ok -> - Shard.Manager.dispatch_to(id, nil, self()) - {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store) - mst = %MST{store: %SApp.BlockStore{pid: block_store}, - cmp: &msg_cmp/2} - GenServer.cast(self(), :init_pull) - {:ok, - %{channel: channel, - id: id, - manifest: manifest, - block_store: block_store, - mst: mst, - subs: MapSet.new, - } - } - :redundant -> - exit(:redundant) - end - end - - @doc """ - Implementation of the :manifest call that returns the chat room's manifest - """ - def handle_call(:manifest, _from, state) do - {:reply, state.manifest, state} - end - - def handle_call({:read_history, top_bound, num}, _from, state) do - ret = MST.last(state.mst, top_bound, num) - {:reply, ret, state} - end - - @doc """ - Implementation of the :init_pull handler, which is called when the - process starts. It contacts all currently connected peers and asks them to - send data for this channel if they have some. - """ - def handle_cast(:init_pull, state) do - for {_, pid, _, _} <- :ets.tab2list(:peer_db) do - GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) - end - {:noreply, state} - end - - @doc """ - Implementation of the :chat_send handler. This is the main handler that is used - to send a message to the chat room. Puts the message in the store and syncs - with all connected peers. - """ - def handle_cast({:chat_send, msg}, state) do - msgitem = {(System.os_time :seconds), - Shard.Identity.get_nickname(), - msg} - prev_root = state.mst.root - mst = MST.insert(state.mst, msgitem) - state = %{state | mst: mst} - - for pid <- state.subs do - if Process.alive?(pid) do - send(pid, {:chat_send, state.channel, msgitem}) - end - end - - notif = {:append, prev_root, msgitem, mst.root} - for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do - Shard.Manager.send(peer_id, {state.id, nil, notif}) - end - - {:noreply, state} - end - - @doc """ - Implementation of the :interested handler, this is called when a peer we are - connected to asks to recieve data for this channel. - """ - def handle_cast({:interested, peer_id}, state) do - Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}}) - {:noreply, state} - end - - def handle_cast({:subscribe, pid}, state) do - Process.monitor(pid) - new_subs = MapSet.put(state.subs, pid) - {:noreply, %{ state | subs: new_subs }} - end - - @doc """ - Implementation of the :msg handler, which is the main handler for messages - comming from other peers concerning this chat room. - - Messages are: - - `{:get, start}`: get some messages starting at a given Merkle hash - - `{:info, start, list, rest}`: put some messages and informs of the - Merkle hash of the store of older messages. - """ - def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do - state = case msg do - {:get_manifest} -> - Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}}) - state - {:append, prev_root, msgitem, new_root} -> - # Append message: one single mesage has arrived - if new_root == state.mst.root do - # We already have the message, do nothing - state - else - # Try adding the message - if prev_root == state.mst.root do - mst2 = MST.insert(state.mst, msgitem) - if mst2.root == new_root do - # This was the only message missing, we are happy! - state = %{state | mst: mst2} - msg_callback(state, msgitem) - state - else - # More messages are missing, start a full merge - init_merge(state, new_root, peer_id) - end - else - init_merge(state, new_root, peer_id) - end - end - {:root, new_root} -> - if new_root == state.mst.root do - # already up to date, ignore - state - else - init_merge(state, new_root, peer_id) - end - x -> - Logger.info("Unhandled message: #{inspect x}") - state - end - {:noreply, state} - end - - defp init_merge(state, new_root, source_peer) do - # TODO: make the merge asynchronous - mgmst = %{state.mst | root: new_root} - mgmst = put_in(mgmst.store.prefer_ask, [source_peer]) - mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end) - %{state | mst: mst} - end - - def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do - new_subs = MapSet.delete(state.subs, pid) - {:noreply, %{ state | subs: new_subs }} - end - - defp msg_callback(state, {ts, nick, msg}) do - for pid <- state.subs do - if Process.alive?(pid) do - send(pid, {:chat_recv, state.channel, {ts, nick, msg}}) - end - end - end - - defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do - cond do - ts1 > ts2 -> :after - ts1 < ts2 -> :before - nick1 > nick2 -> :after - nick1 < nick2 -> :before - msg1 > msg2 -> :after - msg1 < msg2 -> :before - true -> :duplicate - end - end -end diff --git a/lib/application.ex b/lib/application.ex deleted file mode 100644 index 3e3a6ac..0000000 --- a/lib/application.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Shard.Application do - @moduledoc """ - Main Shard application. - - Shard is a prototype peer-to-peer comunication platform with data - synchronization. - """ - - use Application - - def start(_type, _args) do - import Supervisor.Spec, warn: false - - {listen_port, _} = Integer.parse ((System.get_env "PORT") || "4044") - - # Define workers and child supervisors to be supervised - children = [ - Shard.Identity, - { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor }, - - # Networking - { SNet.TCPServer, listen_port }, - - # Applications & data store - { Shard.Manager, listen_port }, - ] - - # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html - # for other strategies and supported options - opts = [strategy: :one_for_one, name: Shard.Supervisor] - Supervisor.start_link(children, opts) - end -end diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex deleted file mode 100644 index 2fbf8c2..0000000 --- a/lib/cli/cli.ex +++ /dev/null @@ -1,98 +0,0 @@ -defmodule SCLI do - @moduledoc """ - Small command line interface for the chat application - """ - - def run() do - run(nil) - end - - defp run(pid) do - handle_messages() - - nick = Shard.Identity.get_nickname - prompt = case pid do - nil -> "(no channel) #{nick}: " - _ -> - {:chat, chan} = GenServer.call(pid, :manifest) - "##{chan} #{nick}: " - end - - str = prompt |> IO.gets |> String.trim - cond do - str == "/quit" -> - nil - String.slice(str, 0..0) == "/" -> - command = str |> String.slice(1..-1) |> String.split(" ") - pid2 = handle_command(pid, command) - run(pid2) - true -> - if str != "" do - GenServer.cast(pid, {:chat_send, str}) - end - run(pid) - end - end - - defp handle_messages() do - receive do - {:chat_recv, chan, {ts, nick, msg}} -> - IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}" - handle_messages() - {:chat_send, _, _} -> - # do nothing - handle_messages() - after 10 -> nil - end - end - - defp handle_command(pid, ["connect", ipstr, portstr]) do - {:ok, ip} = :inet.parse_address (to_charlist ipstr) - {port, _} = Integer.parse portstr - Shard.Manager.add_peer(ip, port) - pid - end - - defp handle_command(pid, ["list"]) do - IO.puts "List of known channels:" - - for {_chid, manifest, _chpid} <- :ets.tab2list(:shard_db) do - {:chat, chan} = manifest - IO.puts "##{chan}" - end - pid - end - - defp handle_command(pid, ["hist"]) do - GenServer.call(pid, {:read_history, nil, 25}) - |> Enum.each(fn {{ts, nick, msg}, true} -> - IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" - end) - pid - end - - defp handle_command(_pid, ["join", qchan]) do - list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db), - {:chat, chan} = manifest, - do: {chan, chpid} - case List.keyfind(list, qchan, 0) do - nil -> - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan}) - GenServer.cast(pid, {:subscribe, self()}) - pid - {_, pid} -> - IO.puts "Switching to ##{qchan}" - pid - end - end - - defp handle_command(pid, ["nick", nick]) do - Shard.Identity.set_nickname nick - pid - end - - defp handle_command(pid, _cmd) do - IO.puts "Invalid command" - pid - end -end diff --git a/lib/data/data.ex b/lib/data/data.ex deleted file mode 100644 index c2c659d..0000000 --- a/lib/data/data.ex +++ /dev/null @@ -1,49 +0,0 @@ -defmodule SData do - @moduledoc """ - Utility functions - - Compare functions are functions that compares stored items and provides a total order. - They must return: - - `:after` if the first argument is more recent - - `:duplicate` if the two items are the same - - `:before` if the first argument is older - These functions must only return :duplicate for equal items. - """ - - @doc """ - Calculate the hash of an Erlang term by first converting it to its - binary representation. - """ - def term_hash(term, algo \\ :sha256) do - :crypto.hash(algo, (:erlang.term_to_binary term)) - end - - @doc""" - Compare function for arbitrary terms using the Erlang order - """ - def cmp_term(a, b) do - cond do - a > b -> :after - a < b -> :before - a == b -> :duplicate - end - end - - @doc""" - Compare function for timestamped strings - """ - def cmp_ts_str({ts1, str1}, {ts2, str2}) do - cond do - ts1 > ts2 -> :after - ts1 < ts2 -> :before - str1 > str2 -> :after - str1 < str2 -> :before - true -> :duplicate - end - end - - @doc""" - Merge function for nils - """ - def merge_true(true, true), do: true -end diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex deleted file mode 100644 index 9b44ee8..0000000 --- a/lib/data/merklelist.ex +++ /dev/null @@ -1,143 +0,0 @@ -defmodule SData.MerkleList do - @moduledoc""" - A simple Merkle list store. - - Future improvements: - - When messages are inserted other than at the top, all intermediate hashes - change. Keep track of the mapping from old hashes to new hashes so that get - requests can work even for hashes that are not valid anymore. - - group items in "pages" (bigger bundles) - """ - - defstruct [:root, :top, :cmp, :store] - - @doc""" - Create a Merkle list store. - - `cmp` is a compare function that respects the interface defined in module `SData`. - """ - def new(cmp) do - root_item = :root - root_hash = SData.term_hash root_item - state = %SData.MerkleList{ - root: root_hash, - top: root_hash, - cmp: cmp, - store: %{ root_hash => root_item } - } - state - end - - defp push(state, item) do - new_item = {item, state.top} - new_item_hash = SData.term_hash new_item - new_store = Map.put(state.store, new_item_hash, new_item) - %{ state | :top => new_item_hash, :store => new_store } - end - - defp pop(state) do - if state.top == state.root do - :error - else - {item, next} = Map.get(state.store, state.top) - new_store = Map.delete(state.store, state.top) - new_state = %{ state | :top => next, :store => new_store } - {:ok, item, new_state} - end - end - - @doc""" - Insert a list of items in the store. - - A callback function may be specified that is called on any item - that is sucessfully added, i.e. that wasn't present in the store before. - """ - def insert_many(state, items, callback \\ (fn _ -> nil end)) do - items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end) - insert_many_aux(state, items_sorted, callback) - end - - defp insert_many_aux(state, [], _callback) do - state - end - - defp insert_many_aux(state, [item | rest], callback) do - case pop(state) do - :error -> - new_state = push(insert_many_aux(state, rest, callback), item) - callback.(item) - new_state - {:ok, front, state_rest} -> - case state.cmp.(item, front) do - :after -> - new_state = push(insert_many_aux(state, rest, callback), item) - callback.(item) - new_state - :duplicate -> insert_many_aux(state, rest, callback) - :before -> push(insert_many_aux(state_rest, [item | rest], callback), front) - end - end - end - - @doc""" - Insert a single item in the store. - - A callback function may be specified that is called on the item - if it is sucessfully added, i.e. it wasn't present in the store before. - """ - def insert(state, item, callback \\ (fn _ -> nil end)) do - insert_many(state, [item], callback) - end - - @doc""" - Read some items from the state. - - The two parameters are optional: - - qbegin : hash of the first item to read - - qlimit : number of items to read - """ - def read(state, qbegin \\ nil, qlimit \\ nil) do - begin = qbegin || state.top - limit = qlimit || 20 - get_items_list(state, begin, limit) - end - - @doc""" - Get the hash of the last item - """ - def top(state) do - state.top - end - - @doc""" - Get the hash of the root item - """ - def root(state) do - state.root - end - - @doc""" - Check if the store holds a certain item - """ - def has(state, hash) do - Map.has_key?(state.store, hash) - end - - defp get_items_list(state, begin, limit) do - case limit do - 0 -> {:ok, [], begin} - _ -> - case Map.fetch(state.store, begin) do - {:ok, :root} -> - {:ok, [], nil } - {:ok, {item, next}} -> - case get_items_list(state, next, limit - 1) do - {:ok, rest, past} -> - {:ok, [ item | rest ], past } - {:error, reason} -> {:error, reason} - end - :error -> {:error, begin} - end - end - end -end diff --git a/lib/data/merklesearchtree.ex b/lib/data/merklesearchtree.ex deleted file mode 100644 index 941d31d..0000000 --- a/lib/data/merklesearchtree.ex +++ /dev/null @@ -1,387 +0,0 @@ -defmodule SData.MerkleSearchTree do - @moduledoc""" - A Merkle search tree. - - A node of the tree is - { - level, - hash_of_node | nil, - [ - { item_low_bound, hash_of_node | nil }, - { item_low_bound, hash_of_node | nil }, - ... - } - } - """ - - alias SData.PageStore, as: Store - - @doc""" - Create a new Merkle search tree. - - This structure can be used as a set with only true keys, - or as a map if a merge function is given. - - `cmp` is a compare function for keys as defined in module `SData`. - - `merge` is a function for merging two items that have the same key. - """ - defstruct root: nil, - store: SData.LocalStore.new, - cmp: &SData.cmp_term/2, - merge: &SData.merge_true/2 - - - defmodule Page do - defstruct [:level, :low, :list] - end - - defimpl SData.Page, for: Page do - def refs(page) do - refs = for {_, _, h} <- page.list, h != nil, do: h - if page.low != nil do - [ page.low | refs ] - else - refs - end - end - end - - - @doc""" - Insert an item into the search tree. - """ - def insert(state, key, value \\ true) do - level = calc_level(key) - {hash, store} = insert_at(state, state.store, state.root, key, level, value) - %{ state | root: hash, store: store } - end - - defp insert_at(s, store, root, key, level, value) do - {new_page, store} = if root == nil do - { %Page{ level: level, low: nil, list: [ { key, value, nil } ] }, store } - else - %Page{ level: plevel, low: low, list: lst } = Store.get(store, root) - [ { k0, _, _} | _ ] = lst - cond do - plevel < level -> - {low, high, store} = split(s, store, root, key) - { %Page{ level: level, low: low, list: [ { key, value, high } ] }, store } - plevel == level -> - store = Store.free(store, root) - case s.cmp.(key, k0) do - :before -> - {low2a, low2b, store} = split(s, store, low, key) - { %Page{ level: level, low: low2a, list: [ { key, value, low2b } | lst] }, store } - _ -> - {new_lst, store} = aux_insert_after_first(s, store, lst, key, value) - { %Page{ level: plevel, low: low, list: new_lst }, store } - end - plevel > level -> - store = Store.free(store, root) - case s.cmp.(key, k0) do - :before -> - {new_low, store} = insert_at(s, store, low, key, level, value) - { %Page{ level: plevel, low: new_low, list: lst }, store } - :after -> - {new_lst, store} = aux_insert_sub_after_first(s, store, lst, key, level, value) - { %Page{ level: plevel, low: low, list: new_lst }, store } - end - end - end - Store.put(store, new_page) # returns {hash, store} - end - - defp split(s, store, hash, key) do - if hash == nil do - {nil, nil, store} - else - %Page{ level: level, low: low, list: lst } = Store.get(store, hash) || Store.get(s.store, hash) - store = Store.free(store, hash) - [ { k0, _, _} | _ ] = lst - case s.cmp.(key, k0) do - :before -> - {lowlow, lowhi, store} = split(s, store, low, key) - newp2 = %Page{ level: level, low: lowhi, list: lst } - {newp2h, store} = Store.put(store, newp2) - {lowlow, newp2h, store} - :after -> - {lst1, p2, store} = split_aux(s, store, lst, key, level) - newp1 = %Page{ level: level, low: low, list: lst1} - {newp1h, store} = Store.put(store, newp1) - {newp1h, p2, store} - end - end - end - - defp split_aux(s, store, lst, key, level) do - case lst do - [ {k1, v1, r1} ] -> - if s.cmp.(k1, key) == :duplicate do - raise "Bad logic" - end - {r1l, r1h, store} = split(s, store, r1, key) - { [{k1, v1, r1l}], r1h, store } - [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] -> - case s.cmp.(key, k2) do - :before -> - {r1l, r1h, store} = split(s, store, r1, key) - p2 = %Page{ level: level, low: r1h, list: [ {k2, v2, r2} | rst ] } - {p2h, store} = Store.put(store, p2) - { [{k1, v1, r1l}], p2h, store } - :after -> - {rst2, hi, store} = split_aux(s, store, [rst1 | rst], key, level) - { [ fst | rst2 ], hi, store } - :duplicate -> - raise "Bad logic" - end - end - end - - defp aux_insert_after_first(s, store, lst, key, value) do - case lst do - [ {k1, v1, r1} ] -> - case s.cmp.(k1, key) do - :duplicate -> - { [ {k1, s.merge.(v1, value), r1} ], store } - :before -> - {r1a, r1b, new_store} = split(s, store, r1, key) - { [ {k1, v1, r1a}, {key, value, r1b} ], new_store } - end - [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] -> - case s.cmp.(k1, key) do - :duplicate -> - { [ {k1, s.merge.(v1, value), r1}, rst1 | rst ], store } - :before -> - case s.cmp.(k2, key) do - :after -> - {r1a, r1b, new_store} = split(s, store, r1, key) - { [ {k1, v1, r1a}, {key, value, r1b}, {k2, v2, r2} | rst ], new_store } - _ -> - {rst2, new_store} = aux_insert_after_first(s, store, [rst1 | rst], key, value) - { [ fst | rst2 ], new_store } - end - end - end - end - - defp aux_insert_sub_after_first(s, store, lst, key, level, value) do - case lst do - [ {k1, v1, r1} ] -> - if s.cmp.(k1, key) == :duplicate do - raise "Bad logic" - end - {r1new, store_new} = insert_at(s, store, r1, key, level, value) - { [{k1, v1, r1new}], store_new } - [ {k1, v1, r1} = fst, {k2, _, _} = rst1 | rst ] -> - if s.cmp.(k1, key) == :duplicate do - raise "Bad logic" - end - case s.cmp.(key, k2) do - :before -> - {r1new, store_new} = insert_at(s, store, r1, key, level, value) - { [{k1, v1, r1new}, rst1 | rst], store_new } - _ -> - {rst2, new_store} = aux_insert_sub_after_first(s, store, [rst1 |rst], key, level, value) - { [ fst | rst2 ], new_store } - end - end - end - - @doc""" - Merge values from another MST in this MST. - - The merge is not symmetrical in the sense that: - - new pages are added in the store of the first argument - - the callback is called for all items found in the second argument and not the first - """ - def merge(to, from, callback \\ fn _, _ -> nil end) do - { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback) - %{ to | store: store, root: root } - end - - defp merge_aux(s1, s2, store, r1, r2, callback) do - case {r1, r2} do - _ when r1 == r2 -> { store, r1 } - {_, nil} -> - { store, r1 } - {nil, _} -> - store = Store.copy(store, s2.store, r2) - rec_callback(store, r2, callback) - { store, r2 } - _ -> - %Page{ level: level1, low: low1, list: lst1 } = Store.get(store, r1) - %Page{ level: level2, low: low2, list: lst2 } = Store.get(store, r2) || Store.get(s2.store, r2) - { level, low1, lst1, low2, lst2 } = cond do - level1 == level2 -> {level1, low1, lst1, low2, lst2} - level1 > level2 -> {level1, low1, lst1, r2, []} - level2 > level1 -> {level2, r1, [], low2, lst2} - end - { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) - page = %Page{ level: level, low: low, list: lst } - {hash, store} = Store.put(store, page) - {store, hash} - end - end - - defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) do - case {lst1, lst2} do - { [], [] } -> - {store, hash} = merge_aux(s1, s2, store, low1, low2, callback) - {store, hash, []} - { [], [ {k, v, r} | rst2 ] } -> - {low1l, low1h, store} = split(s1, store, low1, k) - {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback) - callback.(k, v) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2, callback) - {store, newlow, [ {k, v, newr} | newrst ]} - { [ {k, v, r} | rst1 ], [] } -> - {low2l, low2h, store} = split(s2, store, low2, k) - {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [], callback) - {store, newlow, [ {k, v, newr} | newrst ]} - { [ {k1, v1, r1} | rst1 ], [ {k2, v2, r2} | rst2 ] } -> - case s1.cmp.(k1, k2) do - :before -> - {low2l, low2h, store} = split(s2, store, low2, k1) - {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2, callback) - {store, newlow, [ {k1, v1, newr} | newrst ]} - :after -> - {low1l, low1h, store} = split(s1, store, low1, k2) - {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback) - callback.(k2, v2) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2, callback) - {store, newlow, [ {k2, v2, newr} | newrst ]} - :duplicate -> - {store, newlow} = merge_aux(s1, s2, store, low1, low2, callback) - newv = s1.merge.(v1, v2) ## TODO: callback here ?? - {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2, callback) - {store, newlow, [ {k1, newv, newr} | newrst ]} - end - end - end - - defp rec_callback(store, root, callback) do - case root do - nil -> nil - _ -> - %Page{ level: _, low: low, list: lst } = Store.get(store, root) - rec_callback(store, low, callback) - for {k, v, rst} <- lst do - callback.(k, v) - rec_callback(store, rst, callback) - end - end - end - - @doc""" - Get value for a specific key in search tree. - """ - def get(state, key) do - get(state, state.root, key) - end - - defp get(s, root, key) do - case root do - nil -> nil - _ -> - %Page{ level: _, low: low, list: lst } = Store.get(s.store, root) - get_aux(s, low, lst, key) - end - end - - defp get_aux(s, low, lst, key) do - case lst do - [] -> - get(s, low, key) - [ {k, v, low2} | rst ] -> - case s.cmp.(key, k) do - :duplicate -> v - :before -> - get(s, low, key) - :after -> - get_aux(s, low2, rst, key) - end - end - end - - @doc""" - Get the last n items of the tree, or the last n items - strictly before given upper bound if non nil - """ - def last(state, top_bound, num) do - last(state, state.root, top_bound, num) - end - - defp last(s, root, top_bound, num) do - case root do - nil -> [] - _ -> - %Page{ level: _, low: low, list: lst } = Store.get(s.store, root) - last_aux(s, low, lst, top_bound, num) - end - end - - defp last_aux(s, low, lst, top_bound, num) do - case lst do - [] -> - last(s, low, top_bound, num) - [ {k, v, low2} | rst ] -> - if top_bound == nil or s.cmp.(top_bound, k) == :after do - items = last_aux(s, low2, rst, top_bound, num) - items = if Enum.count(items) < num do - [ {k, v} | items ] - else - items - end - cnt = Enum.count items - if cnt < num do - last(s, low, top_bound, num - cnt) ++ items - else - items - end - else - last(s, low, top_bound, num) - end - end - end - - - @doc""" - Dump Merkle search tree structure. - """ - def dump(state) do - dump(state.store, state.root, "") - end - - defp dump(store, root, lvl) do - case root do - nil -> - IO.puts(lvl <> "nil") - _ -> - %Page{ level: level, low: low, list: lst} = Store.get(store, root) - IO.puts(lvl <> "#{root|>Base.encode16} (#{level})") - dump(store, low, lvl <> " ") - for {k, v, r} <- lst do - IO.puts(lvl<>"- #{inspect k} => #{inspect v}") - dump(store, r, lvl <> " ") - end - end - end - - defp calc_level(key) do - key - |> SData.term_hash - |> Base.encode16 - |> String.to_charlist - |> count_leading_zeroes - end - - defp count_leading_zeroes('0' ++ rest) do - 1 + count_leading_zeroes(rest) - end - defp count_leading_zeroes(_) do - 0 - end -end diff --git a/lib/data/store.ex b/lib/data/store.ex deleted file mode 100644 index ca12cd0..0000000 --- a/lib/data/store.ex +++ /dev/null @@ -1,91 +0,0 @@ -defprotocol SData.Page do - @moduledoc""" - Protocol to be implemented by objects that are used as data pages - in a pagestore and that may reference other data pages by their hash. - """ - - @fallback_to_any true - - @doc""" - Get hashes of all pages referenced by this page. - """ - def refs(page) -end - -defimpl SData.Page, for: Any do - def refs(_page), do: [] -end - - -defprotocol SData.PageStore do - @moduledoc""" - Protocol to be implemented for page stores to allow their - manipulation. - - This protocol may also be implemented by store proxies that track - operations and implement different synchronization or caching mechanisms. - """ - - @doc""" - Put a page. Argument is the content of the page, returns the - hash that the store has associated to it. - - Returns {hash, store} - """ - def put(store, page) - - @doc""" - Get a page referenced by its hash. - - Returns page - """ - def get(store, hash) - - @doc""" - Copy to the store a page and all its references from the other store. - In the case of pages on the network in a distributed store, this may - be lazy. - - Returns store - """ - def copy(store, other_store, hash) - - @doc""" - Free a page referenced by its hash, marking it as no longer needed. - - Returns store - """ - def free(store, hash) -end - - -defmodule SData.LocalStore do - defstruct [:pages] - - def new() do - %SData.LocalStore{ pages: %{} } - end -end - -defimpl SData.PageStore, for: SData.LocalStore do - def put(store, page) do - hash = SData.term_hash page - store = %{ store | pages: Map.put(store.pages, hash, page) } - { hash, store } - end - - def get(store, hash) do - store.pages[hash] - end - - def copy(store, other_store, hash) do - page = SData.PageStore.get(other_store, hash) - refs = SData.Page.refs(page) - store = Enum.reduce(refs, store, fn x, acc -> copy(acc, other_store, x) end) - %{ store | pages: Map.put(store.pages, hash, page) } - end - - def free(store, hash) do - %{ store | pages: Map.delete(store.pages, hash) } - end -end diff --git a/lib/identity.ex b/lib/identity.ex deleted file mode 100644 index f1899df..0000000 --- a/lib/identity.ex +++ /dev/null @@ -1,46 +0,0 @@ -defmodule Shard.Identity do - use Agent - require Salty.Sign.Ed25519, as: Sign - require Logger - - def start_link(_) do - Agent.start_link(__MODULE__, :init, [], name: __MODULE__) - end - - def init() do - Logger.info "Generating keypair..." - {pk, sk} = gen_keypair(Application.get_env(:shard, :peer_id_suffix)) - nick_suffix = pk - |> binary_part(0, 3) - |> Base.encode16 - |> String.downcase - %{ - keypair: {pk, sk}, - nickname: "Anon" <> nick_suffix, - } - end - - defp gen_keypair(suffix, n \\ 0) do - {:ok, pk, sk} = Sign.keypair - if rem(n, 10000) == 0 do - Logger.info "#{n}... expected #{:math.pow(256, byte_size(suffix))}" - end - if :binary.longest_common_suffix([pk, suffix]) == byte_size(suffix) do - {pk, sk} - else - gen_keypair(suffix, n+1) - end - end - - def get_keypair() do - Agent.get(__MODULE__, &(&1.keypair)) - end - - def get_nickname() do - Agent.get(__MODULE__, &(&1.nickname)) - end - - def set_nickname(newnick) do - Agent.update(__MODULE__, &(%{&1 | nickname: newnick})) - end -end diff --git a/lib/manager.ex b/lib/manager.ex deleted file mode 100644 index 82984d6..0000000 --- a/lib/manager.ex +++ /dev/null @@ -1,243 +0,0 @@ -defmodule Shard.Manager do - @moduledoc""" - Maintains several important tables : - - - :peer_db - - List of - { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen } - - - :shard_db - - List of - { id, manifest, pid | nil } - - - :shard_procs - - List of - { {id, path}, pid } - - - :shard_peer_db - - Mult-list of - { shard_id, peer_id } - - - And an internal table : - - - :outbox - - Multi-list of - { peer_id, message } - - """ - - use GenServer - - require Logger - - def start_link(my_port) do - GenServer.start_link(__MODULE__, my_port, name: __MODULE__) - end - - def init(my_port) do - :ets.new(:peer_db, [:set, :protected, :named_table]) - :ets.new(:shard_db, [:set, :protected, :named_table]) - :ets.new(:shard_procs, [:set, :protected, :named_table]) - :ets.new(:shard_peer_db, [:bag, :protected, :named_table]) - outbox = :ets.new(:outbox, [:bag, :private]) - - {:ok, %{my_port: my_port, outbox: outbox} } - end - - def handle_call({:register, shard_id, manifest, pid}, _from, state) do - will_live = case :ets.lookup(:shard_db, shard_id) do - [{ ^shard_id, _, pid }] -> not Process.alive?(pid) - _ -> true - end - reply = if will_live do - Process.monitor(pid) - :ets.insert(:shard_db, {shard_id, manifest, pid}) - :ok - else - :redundant - end - {:reply, reply, state} - end - - def handle_cast({:dispatch_to, shard_id, path, pid}, state) do - :ets.insert(:shard_procs, { {shard_id, path}, pid }) - Process.monitor(pid) - {:noreply, state} - end - - def handle_cast({:interested, peer_id, shards}, state) do - for shard_id <- shards do - case :ets.lookup(:shard_db, shard_id) do - [{ ^shard_id, _, pid }] -> - :ets.insert(:shard_peer_db, {shard_id, peer_id}) - GenServer.cast(pid, {:interested, peer_id}) - [] -> nil - end - end - {:noreply, state} - end - - def handle_cast({:not_interested, peer_id, shard_id}, state) do - :ets.match_delete(:shard_peer_db, {shard_id, peer_id}) - {:noreply, state} - end - - def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do - :ets.insert(:shard_peer_db, {shard_id, peer_id}) - {:noreply, state} - end - - def handle_cast({:peer_up, pk, pid, ip, port}, state) do - for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do - if pk2 != pk do - # obsolete peer information - :ets.delete(:peer_db, pk2) - :ets.match_delete(:shard_peer_db, {:_, pk2}) - end - end - :ets.insert(:peer_db, {pk, pid, ip, port}) - - # Send interested message for all our shards - id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id) - GenServer.cast(pid, {:send_msg, {:interested, id_list}}) - - # Send queued messages - for {_, msg, _} <- :ets.lookup(state.outbox, pk) do - GenServer.cast(pid, {:send_msg, msg}) - end - :ets.delete(state.outbox, pk) - - {:noreply, state} - end - - def handle_cast({:peer_down, pk, ip, port}, state) do - :ets.insert(:peer_db, {pk, nil, ip, port}) - {:noreply, state} - end - - def handle_cast({:connect_and_send, peer_id, msg}, state) do - case :ets.lookup(:peer_db, peer_id) do - [{^peer_id, nil, ip, port}] -> - add_peer(ip, port, state) - currtime = System.os_time :second - :ets.insert(state.outbox, {peer_id, msg, currtime}) - outbox_cleanup = [{{:_, :_, :'$1'}, - [{:<, :'$1', currtime - 60}], - [:'$1']}] - :ets.select_delete(state.outbox, outbox_cleanup) - _ -> - Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database" - end - {:noreply, state} - end - - def handle_cast({:try_connect, pk_list}, state) do - for pk <- pk_list do - case :ets.lookup(:peer_db, pk) do - [{^pk, nil, ip, port}] -> - add_peer(ip, port, state) - _ -> nil - end - end - {:noreply, state} - end - - def handle_cast({:add_peer, ip, port}, state) do - add_peer(ip, port, state) - {:noreply, state} - end - - def handle_info({:DOWN, _, :process, pid, _}, state) do - :ets.match_delete(:shard_procs, {:_, pid}) - {:noreply, state} - end - - defp add_peer(ip, port, state) do - spawn fn -> - case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do - {:ok, client} -> - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) - :ok = :gen_tcp.controlling_process(client, pid) - _ -> - Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" - end - end - end - - - # ================ - # PUBLIC INTERFACE - # ================ - - - @doc""" - Connect to a peer specified by ip address and port - """ - def add_peer(ip, port) do - GenServer.cast(__MODULE__, {:add_peer, ip, port}) - end - - @doc""" - Send message to a peer specified by peer id - """ - def send(peer_id, msg) do - case :ets.lookup(:peer_db, peer_id) do - [{ ^peer_id, pid, _, _}] when pid != nil-> - GenServer.cast(pid, {:send_msg, msg}) - _ -> - GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) - end - end - - @doc""" - Dispatch incoming message to correct shard process - """ - def dispatch(peer_id, {shard_id, path, msg}) do - case :ets.lookup(:shard_db, shard_id) do - [] -> - __MODULE__.send(peer_id, {:not_interested, shard_id}) - [_] -> - case :ets.match(:shard_peer_db, {shard_id, peer_id}) do - [] -> - GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id}) - _ -> nil - end - case :ets.lookup(:shard_procs, {shard_id, path}) do - [{ {^shard_id, ^path}, pid }] -> - GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg}) - [] -> - Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}") - end - end - end - - def dispatch(peer_id, {:interested, shards}) do - GenServer.cast(__MODULE__, {:interested, peer_id, shards}) - end - - def dispatch(peer_id, {:not_interested, shard}) do - GenServer.cast(__MODULE__, {:not_interested, peer_id, shard}) - end - - @doc""" - Register a process as the main process for a shard. - - Returns either :ok or :redundant, in which case the process must exit. - """ - def register(shard_id, manifest, pid) do - GenServer.call(__MODULE__, {:register, shard_id, manifest, pid}) - end - - @doc""" - Register a process as the handler for shard packets for a given path. - """ - def dispatch_to(shard_id, path, pid) do - GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) - end -end diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex deleted file mode 100644 index 44669bb..0000000 --- a/lib/net/tcpconn.ex +++ /dev/null @@ -1,106 +0,0 @@ -defmodule SNet.TCPConn do - use GenServer, restart: :temporary - require Salty.Box.Curve25519xchacha20poly1305, as: Box - require Salty.Sign.Ed25519, as: Sign - require Logger - - def start_link(state) do - GenServer.start_link(__MODULE__, state) - end - - def init(state) do - GenServer.cast(self(), :handshake) - {:ok, state} - end - - def handle_call(:get_host_str, _from, state) do - {:reply, "#{state.his_pkey|>Base.encode16|>String.downcase}@#{to_string(:inet_parse.ntoa(state.addr))}:#{state.port}", state} - end - - def handle_cast(:handshake, state) do - socket = state.socket - - {srv_pkey, srv_skey} = Shard.Identity.get_keypair - {:ok, sess_pkey, sess_skey} = Box.keypair - {:ok, challenge} = Salty.Random.buf 32 - - # Exchange public keys and challenge - hello = {srv_pkey, sess_pkey, challenge, state.my_port} - :gen_tcp.send(socket, :erlang.term_to_binary hello) - {:ok, pkt} = :gen_tcp.recv(socket, 0) - {cli_pkey, cli_sess_pkey, cli_challenge, his_port} = :erlang.binary_to_term(pkt, [:safe]) - - # Do challenge and check their challenge - {:ok, cli_challenge_sign} = Sign.sign_detached(cli_challenge, srv_skey) - pkt = encode_pkt(cli_challenge_sign, cli_sess_pkey, sess_skey) - :gen_tcp.send(socket, pkt) - - {:ok, pkt} = :gen_tcp.recv(socket, 0) - challenge_sign = decode_pkt(pkt, cli_sess_pkey, sess_skey) - :ok = Sign.verify_detached(challenge_sign, challenge, cli_pkey) - - expected_suffix = Application.get_env(:shard, :peer_id_suffix) - len = byte_size(expected_suffix) - ^len = :binary.longest_common_suffix([cli_pkey, expected_suffix]) - - # Connected - :inet.setopts(socket, [active: true]) - - {:ok, {addr, port}} = :inet.peername socket - state =%{ socket: socket, - my_pkey: srv_pkey, - my_skey: srv_skey, - his_pkey: cli_pkey, - conn_my_pkey: sess_pkey, - conn_my_skey: sess_skey, - conn_his_pkey: cli_sess_pkey, - addr: addr, - port: port, - his_port: his_port - } - GenServer.cast(Shard.Manager, {:peer_up, cli_pkey, self(), addr, his_port}) - Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" - - {:noreply, state} - end - - def handle_cast({:send_msg, msg}, state) do - msgbin = :erlang.term_to_binary msg - enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey) - :gen_tcp.send(state.socket, enc) - {:noreply, state} - end - - defp encode_pkt(pkt, pk, sk) do - {:ok, n} = Salty.Random.buf Box.noncebytes - {:ok, msg} = Box.easy(pkt, n, pk, sk) - n <> msg - end - - defp decode_pkt(pkt, pk, sk) do - n = binary_part(pkt, 0, Box.noncebytes) - enc = binary_part(pkt, Box.noncebytes, (byte_size pkt) - Box.noncebytes) - {:ok, msg} = Box.open_easy(enc, n, pk, sk) - msg - end - - def handle_info({:tcp, _socket, raw_data}, state) do - msg = decode_pkt(raw_data, state.conn_his_pkey, state.conn_my_skey) - msg_data = :erlang.binary_to_term(msg, [:safe]) - Shard.Manager.dispatch(state.his_pkey, msg_data) - {:noreply, state} - end - - def handle_info({:tcp_closed, _socket}, state) do - Logger.info "Disconnected: #{print_id state} at #{inspect state.addr}:#{state.port}" - GenServer.cast(Shard.Manager, {:peer_down, state.his_pkey, state.addr, state.his_port}) - exit(:normal) - end - - defp print_id(state) do - state.his_pkey - |> binary_part(0, 8) - |> Base.encode16 - |> String.downcase - end -end diff --git a/lib/net/tcpserver.ex b/lib/net/tcpserver.ex deleted file mode 100644 index 46552a4..0000000 --- a/lib/net/tcpserver.ex +++ /dev/null @@ -1,26 +0,0 @@ -defmodule SNet.TCPServer do - require Logger - use Task, restart: :permanent - - def start_link(port) do - Task.start_link(__MODULE__, :accept, [port]) - end - - @doc """ - Starts accepting connections on the given `port`. - """ - def accept(port) do - {:ok, socket} = :gen_tcp.listen(port, - [:binary, packet: 2, active: false, reuseaddr: true]) - Logger.info "Accepting connections on port #{port}" - loop_acceptor(socket, port) - end - - defp loop_acceptor(socket, my_port) do - {:ok, client} = :gen_tcp.accept(socket) - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}}) - :ok = :gen_tcp.controlling_process(client, pid) - loop_acceptor(socket, my_port) - end -end - diff --git a/mix.exs b/mix.exs deleted file mode 100644 index 7192feb..0000000 --- a/mix.exs +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Shard.MixProject do - use Mix.Project - - def project do - [ - app: :shard, - version: "0.1.0", - elixir: "~> 1.6", - build_embedded: Mix.env == :prod, - start_permanent: Mix.env() == :prod, - deps: deps(), - test_coverage: [tool: ExCoveralls], - preferred_cli_env: [coveralls: :test, "coveralls.detail": :test, "coveralls.post": :test, "coveralls.html": :test] - ] - end - - # Run "mix help compile.app" to learn about applications. - def application do - [ - extra_applications: [:logger], - mod: {Shard.Application, []} - ] - end - - # Run "mix help deps" to learn about dependencies. - defp deps do - [ - {:excoveralls, "~> 0.10", only: :test}, - - {:salty, "~> 0.1.3", hex: :libsalty}, - ] - end -end diff --git a/mix.lock b/mix.lock deleted file mode 100644 index 4f92013..0000000 --- a/mix.lock +++ /dev/null @@ -1,20 +0,0 @@ -%{ - "certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"}, - "cowboy": {:hex, :cowboy, "1.1.2", "61ac29ea970389a88eca5a65601460162d370a70018afe6f949a29dca91f3bb0", [:rebar3], [{:cowlib, "~> 1.0.2", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3.2", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"}, - "cowlib": {:hex, :cowlib, "1.0.2", "9d769a1d062c9c3ac753096f868ca121e2730b9a377de23dec0f7e08b1df84ee", [:make], [], "hexpm"}, - "elixir_make": {:hex, :elixir_make, "0.4.2", "332c649d08c18bc1ecc73b1befc68c647136de4f340b548844efc796405743bf", [:mix], [], "hexpm"}, - "ex2ms": {:hex, :ex2ms, "1.5.0", "19e27f9212be9a96093fed8cdfbef0a2b56c21237196d26760f11dfcfae58e97", [:mix], [], "hexpm"}, - "excoveralls": {:hex, :excoveralls, "0.10.0", "a4508bdd408829f38e7b2519f234b7fd5c83846099cda348efcb5291b081200c", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, - "hackney": {:hex, :hackney, "1.13.0", "24edc8cd2b28e1c652593833862435c80661834f6c9344e84b6a2255e7aeef03", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, - "idna": {:hex, :idna, "5.1.2", "e21cb58a09f0228a9e0b95eaa1217f1bcfc31a1aaa6e1fdf2f53a33f7dbd9494", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, - "jason": {:hex, :jason, "1.1.1", "d3ccb840dfb06f2f90a6d335b536dd074db748b3e7f5b11ab61d239506585eb2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, - "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, - "mime": {:hex, :mime, "1.3.0", "5e8d45a39e95c650900d03f897fbf99ae04f60ab1daa4a34c7a20a5151b7a5fe", [:mix], [], "hexpm"}, - "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"}, - "parse_trans": {:hex, :parse_trans, "3.2.0", "2adfa4daf80c14dc36f522cf190eb5c4ee3e28008fc6394397c16f62a26258c2", [:rebar3], [], "hexpm"}, - "plug": {:hex, :plug, "1.3.6", "bcdf94ac0f4bc3b804bdbdbde37ebf598bd7ed2bfa5106ed1ab5984a09b7e75f", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1", [hex: :cowboy, repo: "hexpm", optional: true]}, {:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}], "hexpm"}, - "ranch": {:hex, :ranch, "1.3.2", "e4965a144dc9fbe70e5c077c65e73c57165416a901bd02ea899cfd95aa890986", [:rebar3], [], "hexpm"}, - "salty": {:hex, :libsalty, "0.1.3", "13332eb13ac995f5deb76903b44f96f740e1e3a6e511222bffdd8b42cd079ffb", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm"}, - "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"}, - "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"}, -} diff --git a/shard/.formatter.exs b/shard/.formatter.exs new file mode 100644 index 0000000..525446d --- /dev/null +++ b/shard/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/shard/.gitignore b/shard/.gitignore new file mode 100644 index 0000000..ed31458 --- /dev/null +++ b/shard/.gitignore @@ -0,0 +1,26 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where 3rd-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +shard-*.tar + +# vim files +*.swp diff --git a/shard/README.md b/shard/README.md new file mode 100644 index 0000000..7abb1ce --- /dev/null +++ b/shard/README.md @@ -0,0 +1,16 @@ +# Shard + +Tests of peer-to-peer stuff. Right now it's a little chat application with replication of message history over peers. Nothing is secure, contrary to the name of the project. + +## Installation + +Download and install [Elixir](https://elixir-lang.org/). + +``` +mix deps.get +mix compile +mix run --no-halt +``` + +P2P port is 4044 by default, can be changed by setting the `$PORT` environment variable. HTTP interface is on port `$PORT`+1000. + diff --git a/shard/config/config.exs b/shard/config/config.exs new file mode 100644 index 0000000..f970078 --- /dev/null +++ b/shard/config/config.exs @@ -0,0 +1,42 @@ +# This file is responsible for configuring your application +# and its dependencies with the aid of the Mix.Config module. +use Mix.Config + +# This configuration is loaded before any dependency and is restricted +# to this project. If another project depends on this project, this +# file won't be loaded nor affect the parent project. For this reason, +# if you want to provide default values for your application for +# 3rd-party users, it should be done in your "mix.exs" file. + +# You can configure your application as: +# +# config :shard, key: :value +# +# and access this configuration in your application as: +# +# Application.get_env(:shard, :key) +# +# You can also configure a 3rd-party app: +# +# config :logger, level: :info +# + + +# Peer id suffix +# ============== +# This Shard instance will only connect to other instances that use +# the same suffix. +# +# On first run, the instance will try to generate a peer id that +# has this suffix. This is done by brute-force testing, therefore +# it is not recommended to use long suffixes. +config :shard, peer_id_suffix: "S" + + +# It is also possible to import configuration files, relative to this +# directory. For example, you can emulate configuration per environment +# by uncommenting the line below and defining dev.exs, test.exs and such. +# Configuration from the imported file will override the ones defined +# here (which is why it is important to import them last). +# +# import_config "#{Mix.env}.exs" diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/blockstore.ex new file mode 100644 index 0000000..8e4fddc --- /dev/null +++ b/shard/lib/app/blockstore.ex @@ -0,0 +1,149 @@ +defmodule SApp.BlockStore do + @moduledoc """ + A module that implements a content-adressable storage (blocks, or pages, + identified by the hash of their contents). + + This is not a shard, it is a side process that a shard may use to store its data. + + TODO: WIP + """ + + use GenServer + + @enforce_keys [:pid] + defstruct [:pid, :prefer_ask] + + + defmodule State do + defstruct [:shard_id, :path, :store, :reqs, :retries] + end + + + def start_link(shard_id, path) do + GenServer.start_link(__MODULE__, [shard_id, path]) + end + + def init([shard_id, path]) do + Shard.Manager.dispatch_to(shard_id, path, self()) + + {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}} + end + + def handle_call({:get, key, prefer_ask}, from, state) do + case state.store[key] do + nil -> + case prefer_ask do + [_ | _] -> + for peer <- prefer_ask do + Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) + end + _ -> + ask_random_peers(state, key) + end + reqs_key = case state.reqs[key] do + nil -> + MapSet.put(MapSet.new(), from) + ms -> + MapSet.put(ms, from) + end + state = put_in(state.reqs[key], reqs_key) + {:noreply, state} + v -> + {:reply, v, state} + end + end + + def handle_call({:put, val}, _from, state) do + hash = SData.term_hash val + state = %{state | store: Map.put(state.store, hash, val)} + {:reply, hash, state} + end + + def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do + state = case msg do + {:get, key} -> + case state.store[key] do + nil -> + Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) + v -> + Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}}) + end + state + {:info, hash, value} -> + if SData.term_hash value == hash do + reqs = case state.reqs[hash] do + nil -> state.reqs + pids -> + for pid <- pids do + GenServer.reply(pid, value) + end + Map.delete(state.reqs, hash) + end + state = %{state | retries: Map.delete(state.retries, hash)} + %{state | store: Map.put(state.store, hash, value), reqs: reqs} + else + state + end + {:not_found, key} -> + if state.reqs[key] != nil and state.store[key] == nil do + nretry = case state.retries[key] do + nil -> 1 + n -> n+1 + end + if nretry < 3 do + ask_random_peers(state, key) + %{state | retries: Map.put(state.retries, key, nretry)} + else + for pid <- state.reqs[key] do + GenServer.reply(pid, nil) + end + state = %{state | reqs: Map.delete(state.reqs, key)} + state = %{state | retries: Map.delete(state.retries, key)} + state + end + else + state + end + end + {:noreply, state} + end + + def ask_random_peers(state, key) do + peers = :ets.lookup(:shard_peer_db, state.shard_id) + |> Enum.shuffle + |> Enum.take(3) + for {_, peer} <- peers do + Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) + end + end + + + defimpl SData.PageStore do + def put(store, page) do + hash = GenServer.call(store.pid, {:put, page}) + { hash, store } + end + + def get(store, hash) do + try do + GenServer.call(store.pid, {:get, hash, store.prefer_ask}) + catch + :exit, {:timeout, _} -> nil + end + end + + def copy(store, other_store, hash) do + page = SData.PageStore.get(other_store, hash) + refs = SData.Page.refs(page) + for ref <- refs do + copy(store, other_store, ref) + end + GenServer.call(store.pid, {:put, page}) + store + end + + def free(store, _hash) do + store ## DO SOMETHING??? + end + end +end diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex new file mode 100644 index 0000000..051fab6 --- /dev/null +++ b/shard/lib/app/chat.ex @@ -0,0 +1,208 @@ +defmodule SApp.Chat do + @moduledoc """ + Shard application for a replicated chat room with full history. + + Chat rooms are globally identified by their channel name. + A chat room manifest is of the form: + + {:chat, channel_name} + + Future improvements: + - message signing + - storage of the chatroom messages to disk + - storage of the known peers that have this channel to disk + - use a DHT to find peers that are interested in this channel + - epidemic broadcast (carefull not to be too costly, + maybe by limiting the number of peers we talk to) + - partial synchronization only == data distributed over peers + """ + + use GenServer + + require Logger + alias SData.MerkleSearchTree, as: MST + + @doc """ + Start a process that connects to a given channel + """ + def start_link(channel) do + GenServer.start_link(__MODULE__, channel) + end + + @doc """ + Initialize channel process. + """ + def init(channel) do + manifest = {:chat, channel} + id = SData.term_hash manifest + + case Shard.Manager.register(id, manifest, self()) do + :ok -> + Shard.Manager.dispatch_to(id, nil, self()) + {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store) + mst = %MST{store: %SApp.BlockStore{pid: block_store}, + cmp: &msg_cmp/2} + GenServer.cast(self(), :init_pull) + {:ok, + %{channel: channel, + id: id, + manifest: manifest, + block_store: block_store, + mst: mst, + subs: MapSet.new, + } + } + :redundant -> + exit(:redundant) + end + end + + @doc """ + Implementation of the :manifest call that returns the chat room's manifest + """ + def handle_call(:manifest, _from, state) do + {:reply, state.manifest, state} + end + + def handle_call({:read_history, top_bound, num}, _from, state) do + ret = MST.last(state.mst, top_bound, num) + {:reply, ret, state} + end + + @doc """ + Implementation of the :init_pull handler, which is called when the + process starts. It contacts all currently connected peers and asks them to + send data for this channel if they have some. + """ + def handle_cast(:init_pull, state) do + for {_, pid, _, _} <- :ets.tab2list(:peer_db) do + GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) + end + {:noreply, state} + end + + @doc """ + Implementation of the :chat_send handler. This is the main handler that is used + to send a message to the chat room. Puts the message in the store and syncs + with all connected peers. + """ + def handle_cast({:chat_send, msg}, state) do + msgitem = {(System.os_time :seconds), + Shard.Identity.get_nickname(), + msg} + prev_root = state.mst.root + mst = MST.insert(state.mst, msgitem) + state = %{state | mst: mst} + + for pid <- state.subs do + if Process.alive?(pid) do + send(pid, {:chat_send, state.channel, msgitem}) + end + end + + notif = {:append, prev_root, msgitem, mst.root} + for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do + Shard.Manager.send(peer_id, {state.id, nil, notif}) + end + + {:noreply, state} + end + + @doc """ + Implementation of the :interested handler, this is called when a peer we are + connected to asks to recieve data for this channel. + """ + def handle_cast({:interested, peer_id}, state) do + Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}}) + {:noreply, state} + end + + def handle_cast({:subscribe, pid}, state) do + Process.monitor(pid) + new_subs = MapSet.put(state.subs, pid) + {:noreply, %{ state | subs: new_subs }} + end + + @doc """ + Implementation of the :msg handler, which is the main handler for messages + comming from other peers concerning this chat room. + + Messages are: + - `{:get, start}`: get some messages starting at a given Merkle hash + - `{:info, start, list, rest}`: put some messages and informs of the + Merkle hash of the store of older messages. + """ + def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do + state = case msg do + {:get_manifest} -> + Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}}) + state + {:append, prev_root, msgitem, new_root} -> + # Append message: one single mesage has arrived + if new_root == state.mst.root do + # We already have the message, do nothing + state + else + # Try adding the message + if prev_root == state.mst.root do + mst2 = MST.insert(state.mst, msgitem) + if mst2.root == new_root do + # This was the only message missing, we are happy! + state = %{state | mst: mst2} + msg_callback(state, msgitem) + state + else + # More messages are missing, start a full merge + init_merge(state, new_root, peer_id) + end + else + init_merge(state, new_root, peer_id) + end + end + {:root, new_root} -> + if new_root == state.mst.root do + # already up to date, ignore + state + else + init_merge(state, new_root, peer_id) + end + x -> + Logger.info("Unhandled message: #{inspect x}") + state + end + {:noreply, state} + end + + defp init_merge(state, new_root, source_peer) do + # TODO: make the merge asynchronous + mgmst = %{state.mst | root: new_root} + mgmst = put_in(mgmst.store.prefer_ask, [source_peer]) + mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end) + %{state | mst: mst} + end + + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + new_subs = MapSet.delete(state.subs, pid) + {:noreply, %{ state | subs: new_subs }} + end + + defp msg_callback(state, {ts, nick, msg}) do + for pid <- state.subs do + if Process.alive?(pid) do + send(pid, {:chat_recv, state.channel, {ts, nick, msg}}) + end + end + end + + defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do + cond do + ts1 > ts2 -> :after + ts1 < ts2 -> :before + nick1 > nick2 -> :after + nick1 < nick2 -> :before + msg1 > msg2 -> :after + msg1 < msg2 -> :before + true -> :duplicate + end + end +end diff --git a/shard/lib/application.ex b/shard/lib/application.ex new file mode 100644 index 0000000..3e3a6ac --- /dev/null +++ b/shard/lib/application.ex @@ -0,0 +1,33 @@ +defmodule Shard.Application do + @moduledoc """ + Main Shard application. + + Shard is a prototype peer-to-peer comunication platform with data + synchronization. + """ + + use Application + + def start(_type, _args) do + import Supervisor.Spec, warn: false + + {listen_port, _} = Integer.parse ((System.get_env "PORT") || "4044") + + # Define workers and child supervisors to be supervised + children = [ + Shard.Identity, + { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor }, + + # Networking + { SNet.TCPServer, listen_port }, + + # Applications & data store + { Shard.Manager, listen_port }, + ] + + # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html + # for other strategies and supported options + opts = [strategy: :one_for_one, name: Shard.Supervisor] + Supervisor.start_link(children, opts) + end +end diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex new file mode 100644 index 0000000..2fbf8c2 --- /dev/null +++ b/shard/lib/cli/cli.ex @@ -0,0 +1,98 @@ +defmodule SCLI do + @moduledoc """ + Small command line interface for the chat application + """ + + def run() do + run(nil) + end + + defp run(pid) do + handle_messages() + + nick = Shard.Identity.get_nickname + prompt = case pid do + nil -> "(no channel) #{nick}: " + _ -> + {:chat, chan} = GenServer.call(pid, :manifest) + "##{chan} #{nick}: " + end + + str = prompt |> IO.gets |> String.trim + cond do + str == "/quit" -> + nil + String.slice(str, 0..0) == "/" -> + command = str |> String.slice(1..-1) |> String.split(" ") + pid2 = handle_command(pid, command) + run(pid2) + true -> + if str != "" do + GenServer.cast(pid, {:chat_send, str}) + end + run(pid) + end + end + + defp handle_messages() do + receive do + {:chat_recv, chan, {ts, nick, msg}} -> + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}" + handle_messages() + {:chat_send, _, _} -> + # do nothing + handle_messages() + after 10 -> nil + end + end + + defp handle_command(pid, ["connect", ipstr, portstr]) do + {:ok, ip} = :inet.parse_address (to_charlist ipstr) + {port, _} = Integer.parse portstr + Shard.Manager.add_peer(ip, port) + pid + end + + defp handle_command(pid, ["list"]) do + IO.puts "List of known channels:" + + for {_chid, manifest, _chpid} <- :ets.tab2list(:shard_db) do + {:chat, chan} = manifest + IO.puts "##{chan}" + end + pid + end + + defp handle_command(pid, ["hist"]) do + GenServer.call(pid, {:read_history, nil, 25}) + |> Enum.each(fn {{ts, nick, msg}, true} -> + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" + end) + pid + end + + defp handle_command(_pid, ["join", qchan]) do + list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db), + {:chat, chan} = manifest, + do: {chan, chpid} + case List.keyfind(list, qchan, 0) do + nil -> + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan}) + GenServer.cast(pid, {:subscribe, self()}) + pid + {_, pid} -> + IO.puts "Switching to ##{qchan}" + pid + end + end + + defp handle_command(pid, ["nick", nick]) do + Shard.Identity.set_nickname nick + pid + end + + defp handle_command(pid, _cmd) do + IO.puts "Invalid command" + pid + end +end diff --git a/shard/lib/data/data.ex b/shard/lib/data/data.ex new file mode 100644 index 0000000..c2c659d --- /dev/null +++ b/shard/lib/data/data.ex @@ -0,0 +1,49 @@ +defmodule SData do + @moduledoc """ + Utility functions + + Compare functions are functions that compares stored items and provides a total order. + They must return: + - `:after` if the first argument is more recent + - `:duplicate` if the two items are the same + - `:before` if the first argument is older + These functions must only return :duplicate for equal items. + """ + + @doc """ + Calculate the hash of an Erlang term by first converting it to its + binary representation. + """ + def term_hash(term, algo \\ :sha256) do + :crypto.hash(algo, (:erlang.term_to_binary term)) + end + + @doc""" + Compare function for arbitrary terms using the Erlang order + """ + def cmp_term(a, b) do + cond do + a > b -> :after + a < b -> :before + a == b -> :duplicate + end + end + + @doc""" + Compare function for timestamped strings + """ + def cmp_ts_str({ts1, str1}, {ts2, str2}) do + cond do + ts1 > ts2 -> :after + ts1 < ts2 -> :before + str1 > str2 -> :after + str1 < str2 -> :before + true -> :duplicate + end + end + + @doc""" + Merge function for nils + """ + def merge_true(true, true), do: true +end diff --git a/shard/lib/data/merklelist.ex b/shard/lib/data/merklelist.ex new file mode 100644 index 0000000..9b44ee8 --- /dev/null +++ b/shard/lib/data/merklelist.ex @@ -0,0 +1,143 @@ +defmodule SData.MerkleList do + @moduledoc""" + A simple Merkle list store. + + Future improvements: + - When messages are inserted other than at the top, all intermediate hashes + change. Keep track of the mapping from old hashes to new hashes so that get + requests can work even for hashes that are not valid anymore. + - group items in "pages" (bigger bundles) + """ + + defstruct [:root, :top, :cmp, :store] + + @doc""" + Create a Merkle list store. + + `cmp` is a compare function that respects the interface defined in module `SData`. + """ + def new(cmp) do + root_item = :root + root_hash = SData.term_hash root_item + state = %SData.MerkleList{ + root: root_hash, + top: root_hash, + cmp: cmp, + store: %{ root_hash => root_item } + } + state + end + + defp push(state, item) do + new_item = {item, state.top} + new_item_hash = SData.term_hash new_item + new_store = Map.put(state.store, new_item_hash, new_item) + %{ state | :top => new_item_hash, :store => new_store } + end + + defp pop(state) do + if state.top == state.root do + :error + else + {item, next} = Map.get(state.store, state.top) + new_store = Map.delete(state.store, state.top) + new_state = %{ state | :top => next, :store => new_store } + {:ok, item, new_state} + end + end + + @doc""" + Insert a list of items in the store. + + A callback function may be specified that is called on any item + that is sucessfully added, i.e. that wasn't present in the store before. + """ + def insert_many(state, items, callback \\ (fn _ -> nil end)) do + items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end) + insert_many_aux(state, items_sorted, callback) + end + + defp insert_many_aux(state, [], _callback) do + state + end + + defp insert_many_aux(state, [item | rest], callback) do + case pop(state) do + :error -> + new_state = push(insert_many_aux(state, rest, callback), item) + callback.(item) + new_state + {:ok, front, state_rest} -> + case state.cmp.(item, front) do + :after -> + new_state = push(insert_many_aux(state, rest, callback), item) + callback.(item) + new_state + :duplicate -> insert_many_aux(state, rest, callback) + :before -> push(insert_many_aux(state_rest, [item | rest], callback), front) + end + end + end + + @doc""" + Insert a single item in the store. + + A callback function may be specified that is called on the item + if it is sucessfully added, i.e. it wasn't present in the store before. + """ + def insert(state, item, callback \\ (fn _ -> nil end)) do + insert_many(state, [item], callback) + end + + @doc""" + Read some items from the state. + + The two parameters are optional: + - qbegin : hash of the first item to read + - qlimit : number of items to read + """ + def read(state, qbegin \\ nil, qlimit \\ nil) do + begin = qbegin || state.top + limit = qlimit || 20 + get_items_list(state, begin, limit) + end + + @doc""" + Get the hash of the last item + """ + def top(state) do + state.top + end + + @doc""" + Get the hash of the root item + """ + def root(state) do + state.root + end + + @doc""" + Check if the store holds a certain item + """ + def has(state, hash) do + Map.has_key?(state.store, hash) + end + + defp get_items_list(state, begin, limit) do + case limit do + 0 -> {:ok, [], begin} + _ -> + case Map.fetch(state.store, begin) do + {:ok, :root} -> + {:ok, [], nil } + {:ok, {item, next}} -> + case get_items_list(state, next, limit - 1) do + {:ok, rest, past} -> + {:ok, [ item | rest ], past } + {:error, reason} -> {:error, reason} + end + :error -> {:error, begin} + end + end + end +end diff --git a/shard/lib/data/merklesearchtree.ex b/shard/lib/data/merklesearchtree.ex new file mode 100644 index 0000000..941d31d --- /dev/null +++ b/shard/lib/data/merklesearchtree.ex @@ -0,0 +1,387 @@ +defmodule SData.MerkleSearchTree do + @moduledoc""" + A Merkle search tree. + + A node of the tree is + { + level, + hash_of_node | nil, + [ + { item_low_bound, hash_of_node | nil }, + { item_low_bound, hash_of_node | nil }, + ... + } + } + """ + + alias SData.PageStore, as: Store + + @doc""" + Create a new Merkle search tree. + + This structure can be used as a set with only true keys, + or as a map if a merge function is given. + + `cmp` is a compare function for keys as defined in module `SData`. + + `merge` is a function for merging two items that have the same key. + """ + defstruct root: nil, + store: SData.LocalStore.new, + cmp: &SData.cmp_term/2, + merge: &SData.merge_true/2 + + + defmodule Page do + defstruct [:level, :low, :list] + end + + defimpl SData.Page, for: Page do + def refs(page) do + refs = for {_, _, h} <- page.list, h != nil, do: h + if page.low != nil do + [ page.low | refs ] + else + refs + end + end + end + + + @doc""" + Insert an item into the search tree. + """ + def insert(state, key, value \\ true) do + level = calc_level(key) + {hash, store} = insert_at(state, state.store, state.root, key, level, value) + %{ state | root: hash, store: store } + end + + defp insert_at(s, store, root, key, level, value) do + {new_page, store} = if root == nil do + { %Page{ level: level, low: nil, list: [ { key, value, nil } ] }, store } + else + %Page{ level: plevel, low: low, list: lst } = Store.get(store, root) + [ { k0, _, _} | _ ] = lst + cond do + plevel < level -> + {low, high, store} = split(s, store, root, key) + { %Page{ level: level, low: low, list: [ { key, value, high } ] }, store } + plevel == level -> + store = Store.free(store, root) + case s.cmp.(key, k0) do + :before -> + {low2a, low2b, store} = split(s, store, low, key) + { %Page{ level: level, low: low2a, list: [ { key, value, low2b } | lst] }, store } + _ -> + {new_lst, store} = aux_insert_after_first(s, store, lst, key, value) + { %Page{ level: plevel, low: low, list: new_lst }, store } + end + plevel > level -> + store = Store.free(store, root) + case s.cmp.(key, k0) do + :before -> + {new_low, store} = insert_at(s, store, low, key, level, value) + { %Page{ level: plevel, low: new_low, list: lst }, store } + :after -> + {new_lst, store} = aux_insert_sub_after_first(s, store, lst, key, level, value) + { %Page{ level: plevel, low: low, list: new_lst }, store } + end + end + end + Store.put(store, new_page) # returns {hash, store} + end + + defp split(s, store, hash, key) do + if hash == nil do + {nil, nil, store} + else + %Page{ level: level, low: low, list: lst } = Store.get(store, hash) || Store.get(s.store, hash) + store = Store.free(store, hash) + [ { k0, _, _} | _ ] = lst + case s.cmp.(key, k0) do + :before -> + {lowlow, lowhi, store} = split(s, store, low, key) + newp2 = %Page{ level: level, low: lowhi, list: lst } + {newp2h, store} = Store.put(store, newp2) + {lowlow, newp2h, store} + :after -> + {lst1, p2, store} = split_aux(s, store, lst, key, level) + newp1 = %Page{ level: level, low: low, list: lst1} + {newp1h, store} = Store.put(store, newp1) + {newp1h, p2, store} + end + end + end + + defp split_aux(s, store, lst, key, level) do + case lst do + [ {k1, v1, r1} ] -> + if s.cmp.(k1, key) == :duplicate do + raise "Bad logic" + end + {r1l, r1h, store} = split(s, store, r1, key) + { [{k1, v1, r1l}], r1h, store } + [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] -> + case s.cmp.(key, k2) do + :before -> + {r1l, r1h, store} = split(s, store, r1, key) + p2 = %Page{ level: level, low: r1h, list: [ {k2, v2, r2} | rst ] } + {p2h, store} = Store.put(store, p2) + { [{k1, v1, r1l}], p2h, store } + :after -> + {rst2, hi, store} = split_aux(s, store, [rst1 | rst], key, level) + { [ fst | rst2 ], hi, store } + :duplicate -> + raise "Bad logic" + end + end + end + + defp aux_insert_after_first(s, store, lst, key, value) do + case lst do + [ {k1, v1, r1} ] -> + case s.cmp.(k1, key) do + :duplicate -> + { [ {k1, s.merge.(v1, value), r1} ], store } + :before -> + {r1a, r1b, new_store} = split(s, store, r1, key) + { [ {k1, v1, r1a}, {key, value, r1b} ], new_store } + end + [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] -> + case s.cmp.(k1, key) do + :duplicate -> + { [ {k1, s.merge.(v1, value), r1}, rst1 | rst ], store } + :before -> + case s.cmp.(k2, key) do + :after -> + {r1a, r1b, new_store} = split(s, store, r1, key) + { [ {k1, v1, r1a}, {key, value, r1b}, {k2, v2, r2} | rst ], new_store } + _ -> + {rst2, new_store} = aux_insert_after_first(s, store, [rst1 | rst], key, value) + { [ fst | rst2 ], new_store } + end + end + end + end + + defp aux_insert_sub_after_first(s, store, lst, key, level, value) do + case lst do + [ {k1, v1, r1} ] -> + if s.cmp.(k1, key) == :duplicate do + raise "Bad logic" + end + {r1new, store_new} = insert_at(s, store, r1, key, level, value) + { [{k1, v1, r1new}], store_new } + [ {k1, v1, r1} = fst, {k2, _, _} = rst1 | rst ] -> + if s.cmp.(k1, key) == :duplicate do + raise "Bad logic" + end + case s.cmp.(key, k2) do + :before -> + {r1new, store_new} = insert_at(s, store, r1, key, level, value) + { [{k1, v1, r1new}, rst1 | rst], store_new } + _ -> + {rst2, new_store} = aux_insert_sub_after_first(s, store, [rst1 |rst], key, level, value) + { [ fst | rst2 ], new_store } + end + end + end + + @doc""" + Merge values from another MST in this MST. + + The merge is not symmetrical in the sense that: + - new pages are added in the store of the first argument + - the callback is called for all items found in the second argument and not the first + """ + def merge(to, from, callback \\ fn _, _ -> nil end) do + { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback) + %{ to | store: store, root: root } + end + + defp merge_aux(s1, s2, store, r1, r2, callback) do + case {r1, r2} do + _ when r1 == r2 -> { store, r1 } + {_, nil} -> + { store, r1 } + {nil, _} -> + store = Store.copy(store, s2.store, r2) + rec_callback(store, r2, callback) + { store, r2 } + _ -> + %Page{ level: level1, low: low1, list: lst1 } = Store.get(store, r1) + %Page{ level: level2, low: low2, list: lst2 } = Store.get(store, r2) || Store.get(s2.store, r2) + { level, low1, lst1, low2, lst2 } = cond do + level1 == level2 -> {level1, low1, lst1, low2, lst2} + level1 > level2 -> {level1, low1, lst1, r2, []} + level2 > level1 -> {level2, r1, [], low2, lst2} + end + { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) + page = %Page{ level: level, low: low, list: lst } + {hash, store} = Store.put(store, page) + {store, hash} + end + end + + defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) do + case {lst1, lst2} do + { [], [] } -> + {store, hash} = merge_aux(s1, s2, store, low1, low2, callback) + {store, hash, []} + { [], [ {k, v, r} | rst2 ] } -> + {low1l, low1h, store} = split(s1, store, low1, k) + {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback) + callback.(k, v) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2, callback) + {store, newlow, [ {k, v, newr} | newrst ]} + { [ {k, v, r} | rst1 ], [] } -> + {low2l, low2h, store} = split(s2, store, low2, k) + {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [], callback) + {store, newlow, [ {k, v, newr} | newrst ]} + { [ {k1, v1, r1} | rst1 ], [ {k2, v2, r2} | rst2 ] } -> + case s1.cmp.(k1, k2) do + :before -> + {low2l, low2h, store} = split(s2, store, low2, k1) + {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2, callback) + {store, newlow, [ {k1, v1, newr} | newrst ]} + :after -> + {low1l, low1h, store} = split(s1, store, low1, k2) + {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback) + callback.(k2, v2) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2, callback) + {store, newlow, [ {k2, v2, newr} | newrst ]} + :duplicate -> + {store, newlow} = merge_aux(s1, s2, store, low1, low2, callback) + newv = s1.merge.(v1, v2) ## TODO: callback here ?? + {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2, callback) + {store, newlow, [ {k1, newv, newr} | newrst ]} + end + end + end + + defp rec_callback(store, root, callback) do + case root do + nil -> nil + _ -> + %Page{ level: _, low: low, list: lst } = Store.get(store, root) + rec_callback(store, low, callback) + for {k, v, rst} <- lst do + callback.(k, v) + rec_callback(store, rst, callback) + end + end + end + + @doc""" + Get value for a specific key in search tree. + """ + def get(state, key) do + get(state, state.root, key) + end + + defp get(s, root, key) do + case root do + nil -> nil + _ -> + %Page{ level: _, low: low, list: lst } = Store.get(s.store, root) + get_aux(s, low, lst, key) + end + end + + defp get_aux(s, low, lst, key) do + case lst do + [] -> + get(s, low, key) + [ {k, v, low2} | rst ] -> + case s.cmp.(key, k) do + :duplicate -> v + :before -> + get(s, low, key) + :after -> + get_aux(s, low2, rst, key) + end + end + end + + @doc""" + Get the last n items of the tree, or the last n items + strictly before given upper bound if non nil + """ + def last(state, top_bound, num) do + last(state, state.root, top_bound, num) + end + + defp last(s, root, top_bound, num) do + case root do + nil -> [] + _ -> + %Page{ level: _, low: low, list: lst } = Store.get(s.store, root) + last_aux(s, low, lst, top_bound, num) + end + end + + defp last_aux(s, low, lst, top_bound, num) do + case lst do + [] -> + last(s, low, top_bound, num) + [ {k, v, low2} | rst ] -> + if top_bound == nil or s.cmp.(top_bound, k) == :after do + items = last_aux(s, low2, rst, top_bound, num) + items = if Enum.count(items) < num do + [ {k, v} | items ] + else + items + end + cnt = Enum.count items + if cnt < num do + last(s, low, top_bound, num - cnt) ++ items + else + items + end + else + last(s, low, top_bound, num) + end + end + end + + + @doc""" + Dump Merkle search tree structure. + """ + def dump(state) do + dump(state.store, state.root, "") + end + + defp dump(store, root, lvl) do + case root do + nil -> + IO.puts(lvl <> "nil") + _ -> + %Page{ level: level, low: low, list: lst} = Store.get(store, root) + IO.puts(lvl <> "#{root|>Base.encode16} (#{level})") + dump(store, low, lvl <> " ") + for {k, v, r} <- lst do + IO.puts(lvl<>"- #{inspect k} => #{inspect v}") + dump(store, r, lvl <> " ") + end + end + end + + defp calc_level(key) do + key + |> SData.term_hash + |> Base.encode16 + |> String.to_charlist + |> count_leading_zeroes + end + + defp count_leading_zeroes('0' ++ rest) do + 1 + count_leading_zeroes(rest) + end + defp count_leading_zeroes(_) do + 0 + end +end diff --git a/shard/lib/data/store.ex b/shard/lib/data/store.ex new file mode 100644 index 0000000..ca12cd0 --- /dev/null +++ b/shard/lib/data/store.ex @@ -0,0 +1,91 @@ +defprotocol SData.Page do + @moduledoc""" + Protocol to be implemented by objects that are used as data pages + in a pagestore and that may reference other data pages by their hash. + """ + + @fallback_to_any true + + @doc""" + Get hashes of all pages referenced by this page. + """ + def refs(page) +end + +defimpl SData.Page, for: Any do + def refs(_page), do: [] +end + + +defprotocol SData.PageStore do + @moduledoc""" + Protocol to be implemented for page stores to allow their + manipulation. + + This protocol may also be implemented by store proxies that track + operations and implement different synchronization or caching mechanisms. + """ + + @doc""" + Put a page. Argument is the content of the page, returns the + hash that the store has associated to it. + + Returns {hash, store} + """ + def put(store, page) + + @doc""" + Get a page referenced by its hash. + + Returns page + """ + def get(store, hash) + + @doc""" + Copy to the store a page and all its references from the other store. + In the case of pages on the network in a distributed store, this may + be lazy. + + Returns store + """ + def copy(store, other_store, hash) + + @doc""" + Free a page referenced by its hash, marking it as no longer needed. + + Returns store + """ + def free(store, hash) +end + + +defmodule SData.LocalStore do + defstruct [:pages] + + def new() do + %SData.LocalStore{ pages: %{} } + end +end + +defimpl SData.PageStore, for: SData.LocalStore do + def put(store, page) do + hash = SData.term_hash page + store = %{ store | pages: Map.put(store.pages, hash, page) } + { hash, store } + end + + def get(store, hash) do + store.pages[hash] + end + + def copy(store, other_store, hash) do + page = SData.PageStore.get(other_store, hash) + refs = SData.Page.refs(page) + store = Enum.reduce(refs, store, fn x, acc -> copy(acc, other_store, x) end) + %{ store | pages: Map.put(store.pages, hash, page) } + end + + def free(store, hash) do + %{ store | pages: Map.delete(store.pages, hash) } + end +end diff --git a/shard/lib/identity.ex b/shard/lib/identity.ex new file mode 100644 index 0000000..f1899df --- /dev/null +++ b/shard/lib/identity.ex @@ -0,0 +1,46 @@ +defmodule Shard.Identity do + use Agent + require Salty.Sign.Ed25519, as: Sign + require Logger + + def start_link(_) do + Agent.start_link(__MODULE__, :init, [], name: __MODULE__) + end + + def init() do + Logger.info "Generating keypair..." + {pk, sk} = gen_keypair(Application.get_env(:shard, :peer_id_suffix)) + nick_suffix = pk + |> binary_part(0, 3) + |> Base.encode16 + |> String.downcase + %{ + keypair: {pk, sk}, + nickname: "Anon" <> nick_suffix, + } + end + + defp gen_keypair(suffix, n \\ 0) do + {:ok, pk, sk} = Sign.keypair + if rem(n, 10000) == 0 do + Logger.info "#{n}... expected #{:math.pow(256, byte_size(suffix))}" + end + if :binary.longest_common_suffix([pk, suffix]) == byte_size(suffix) do + {pk, sk} + else + gen_keypair(suffix, n+1) + end + end + + def get_keypair() do + Agent.get(__MODULE__, &(&1.keypair)) + end + + def get_nickname() do + Agent.get(__MODULE__, &(&1.nickname)) + end + + def set_nickname(newnick) do + Agent.update(__MODULE__, &(%{&1 | nickname: newnick})) + end +end diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex new file mode 100644 index 0000000..82984d6 --- /dev/null +++ b/shard/lib/manager.ex @@ -0,0 +1,243 @@ +defmodule Shard.Manager do + @moduledoc""" + Maintains several important tables : + + - :peer_db + + List of + { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen } + + - :shard_db + + List of + { id, manifest, pid | nil } + + - :shard_procs + + List of + { {id, path}, pid } + + - :shard_peer_db + + Mult-list of + { shard_id, peer_id } + + + And an internal table : + + - :outbox + + Multi-list of + { peer_id, message } + + """ + + use GenServer + + require Logger + + def start_link(my_port) do + GenServer.start_link(__MODULE__, my_port, name: __MODULE__) + end + + def init(my_port) do + :ets.new(:peer_db, [:set, :protected, :named_table]) + :ets.new(:shard_db, [:set, :protected, :named_table]) + :ets.new(:shard_procs, [:set, :protected, :named_table]) + :ets.new(:shard_peer_db, [:bag, :protected, :named_table]) + outbox = :ets.new(:outbox, [:bag, :private]) + + {:ok, %{my_port: my_port, outbox: outbox} } + end + + def handle_call({:register, shard_id, manifest, pid}, _from, state) do + will_live = case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] -> not Process.alive?(pid) + _ -> true + end + reply = if will_live do + Process.monitor(pid) + :ets.insert(:shard_db, {shard_id, manifest, pid}) + :ok + else + :redundant + end + {:reply, reply, state} + end + + def handle_cast({:dispatch_to, shard_id, path, pid}, state) do + :ets.insert(:shard_procs, { {shard_id, path}, pid }) + Process.monitor(pid) + {:noreply, state} + end + + def handle_cast({:interested, peer_id, shards}, state) do + for shard_id <- shards do + case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] -> + :ets.insert(:shard_peer_db, {shard_id, peer_id}) + GenServer.cast(pid, {:interested, peer_id}) + [] -> nil + end + end + {:noreply, state} + end + + def handle_cast({:not_interested, peer_id, shard_id}, state) do + :ets.match_delete(:shard_peer_db, {shard_id, peer_id}) + {:noreply, state} + end + + def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do + :ets.insert(:shard_peer_db, {shard_id, peer_id}) + {:noreply, state} + end + + def handle_cast({:peer_up, pk, pid, ip, port}, state) do + for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do + if pk2 != pk do + # obsolete peer information + :ets.delete(:peer_db, pk2) + :ets.match_delete(:shard_peer_db, {:_, pk2}) + end + end + :ets.insert(:peer_db, {pk, pid, ip, port}) + + # Send interested message for all our shards + id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id) + GenServer.cast(pid, {:send_msg, {:interested, id_list}}) + + # Send queued messages + for {_, msg, _} <- :ets.lookup(state.outbox, pk) do + GenServer.cast(pid, {:send_msg, msg}) + end + :ets.delete(state.outbox, pk) + + {:noreply, state} + end + + def handle_cast({:peer_down, pk, ip, port}, state) do + :ets.insert(:peer_db, {pk, nil, ip, port}) + {:noreply, state} + end + + def handle_cast({:connect_and_send, peer_id, msg}, state) do + case :ets.lookup(:peer_db, peer_id) do + [{^peer_id, nil, ip, port}] -> + add_peer(ip, port, state) + currtime = System.os_time :second + :ets.insert(state.outbox, {peer_id, msg, currtime}) + outbox_cleanup = [{{:_, :_, :'$1'}, + [{:<, :'$1', currtime - 60}], + [:'$1']}] + :ets.select_delete(state.outbox, outbox_cleanup) + _ -> + Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database" + end + {:noreply, state} + end + + def handle_cast({:try_connect, pk_list}, state) do + for pk <- pk_list do + case :ets.lookup(:peer_db, pk) do + [{^pk, nil, ip, port}] -> + add_peer(ip, port, state) + _ -> nil + end + end + {:noreply, state} + end + + def handle_cast({:add_peer, ip, port}, state) do + add_peer(ip, port, state) + {:noreply, state} + end + + def handle_info({:DOWN, _, :process, pid, _}, state) do + :ets.match_delete(:shard_procs, {:_, pid}) + {:noreply, state} + end + + defp add_peer(ip, port, state) do + spawn fn -> + case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do + {:ok, client} -> + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) + :ok = :gen_tcp.controlling_process(client, pid) + _ -> + Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" + end + end + end + + + # ================ + # PUBLIC INTERFACE + # ================ + + + @doc""" + Connect to a peer specified by ip address and port + """ + def add_peer(ip, port) do + GenServer.cast(__MODULE__, {:add_peer, ip, port}) + end + + @doc""" + Send message to a peer specified by peer id + """ + def send(peer_id, msg) do + case :ets.lookup(:peer_db, peer_id) do + [{ ^peer_id, pid, _, _}] when pid != nil-> + GenServer.cast(pid, {:send_msg, msg}) + _ -> + GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) + end + end + + @doc""" + Dispatch incoming message to correct shard process + """ + def dispatch(peer_id, {shard_id, path, msg}) do + case :ets.lookup(:shard_db, shard_id) do + [] -> + __MODULE__.send(peer_id, {:not_interested, shard_id}) + [_] -> + case :ets.match(:shard_peer_db, {shard_id, peer_id}) do + [] -> + GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id}) + _ -> nil + end + case :ets.lookup(:shard_procs, {shard_id, path}) do + [{ {^shard_id, ^path}, pid }] -> + GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg}) + [] -> + Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}") + end + end + end + + def dispatch(peer_id, {:interested, shards}) do + GenServer.cast(__MODULE__, {:interested, peer_id, shards}) + end + + def dispatch(peer_id, {:not_interested, shard}) do + GenServer.cast(__MODULE__, {:not_interested, peer_id, shard}) + end + + @doc""" + Register a process as the main process for a shard. + + Returns either :ok or :redundant, in which case the process must exit. + """ + def register(shard_id, manifest, pid) do + GenServer.call(__MODULE__, {:register, shard_id, manifest, pid}) + end + + @doc""" + Register a process as the handler for shard packets for a given path. + """ + def dispatch_to(shard_id, path, pid) do + GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) + end +end diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex new file mode 100644 index 0000000..44669bb --- /dev/null +++ b/shard/lib/net/tcpconn.ex @@ -0,0 +1,106 @@ +defmodule SNet.TCPConn do + use GenServer, restart: :temporary + require Salty.Box.Curve25519xchacha20poly1305, as: Box + require Salty.Sign.Ed25519, as: Sign + require Logger + + def start_link(state) do + GenServer.start_link(__MODULE__, state) + end + + def init(state) do + GenServer.cast(self(), :handshake) + {:ok, state} + end + + def handle_call(:get_host_str, _from, state) do + {:reply, "#{state.his_pkey|>Base.encode16|>String.downcase}@#{to_string(:inet_parse.ntoa(state.addr))}:#{state.port}", state} + end + + def handle_cast(:handshake, state) do + socket = state.socket + + {srv_pkey, srv_skey} = Shard.Identity.get_keypair + {:ok, sess_pkey, sess_skey} = Box.keypair + {:ok, challenge} = Salty.Random.buf 32 + + # Exchange public keys and challenge + hello = {srv_pkey, sess_pkey, challenge, state.my_port} + :gen_tcp.send(socket, :erlang.term_to_binary hello) + {:ok, pkt} = :gen_tcp.recv(socket, 0) + {cli_pkey, cli_sess_pkey, cli_challenge, his_port} = :erlang.binary_to_term(pkt, [:safe]) + + # Do challenge and check their challenge + {:ok, cli_challenge_sign} = Sign.sign_detached(cli_challenge, srv_skey) + pkt = encode_pkt(cli_challenge_sign, cli_sess_pkey, sess_skey) + :gen_tcp.send(socket, pkt) + + {:ok, pkt} = :gen_tcp.recv(socket, 0) + challenge_sign = decode_pkt(pkt, cli_sess_pkey, sess_skey) + :ok = Sign.verify_detached(challenge_sign, challenge, cli_pkey) + + expected_suffix = Application.get_env(:shard, :peer_id_suffix) + len = byte_size(expected_suffix) + ^len = :binary.longest_common_suffix([cli_pkey, expected_suffix]) + + # Connected + :inet.setopts(socket, [active: true]) + + {:ok, {addr, port}} = :inet.peername socket + state =%{ socket: socket, + my_pkey: srv_pkey, + my_skey: srv_skey, + his_pkey: cli_pkey, + conn_my_pkey: sess_pkey, + conn_my_skey: sess_skey, + conn_his_pkey: cli_sess_pkey, + addr: addr, + port: port, + his_port: his_port + } + GenServer.cast(Shard.Manager, {:peer_up, cli_pkey, self(), addr, his_port}) + Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" + + {:noreply, state} + end + + def handle_cast({:send_msg, msg}, state) do + msgbin = :erlang.term_to_binary msg + enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey) + :gen_tcp.send(state.socket, enc) + {:noreply, state} + end + + defp encode_pkt(pkt, pk, sk) do + {:ok, n} = Salty.Random.buf Box.noncebytes + {:ok, msg} = Box.easy(pkt, n, pk, sk) + n <> msg + end + + defp decode_pkt(pkt, pk, sk) do + n = binary_part(pkt, 0, Box.noncebytes) + enc = binary_part(pkt, Box.noncebytes, (byte_size pkt) - Box.noncebytes) + {:ok, msg} = Box.open_easy(enc, n, pk, sk) + msg + end + + def handle_info({:tcp, _socket, raw_data}, state) do + msg = decode_pkt(raw_data, state.conn_his_pkey, state.conn_my_skey) + msg_data = :erlang.binary_to_term(msg, [:safe]) + Shard.Manager.dispatch(state.his_pkey, msg_data) + {:noreply, state} + end + + def handle_info({:tcp_closed, _socket}, state) do + Logger.info "Disconnected: #{print_id state} at #{inspect state.addr}:#{state.port}" + GenServer.cast(Shard.Manager, {:peer_down, state.his_pkey, state.addr, state.his_port}) + exit(:normal) + end + + defp print_id(state) do + state.his_pkey + |> binary_part(0, 8) + |> Base.encode16 + |> String.downcase + end +end diff --git a/shard/lib/net/tcpserver.ex b/shard/lib/net/tcpserver.ex new file mode 100644 index 0000000..46552a4 --- /dev/null +++ b/shard/lib/net/tcpserver.ex @@ -0,0 +1,26 @@ +defmodule SNet.TCPServer do + require Logger + use Task, restart: :permanent + + def start_link(port) do + Task.start_link(__MODULE__, :accept, [port]) + end + + @doc """ + Starts accepting connections on the given `port`. + """ + def accept(port) do + {:ok, socket} = :gen_tcp.listen(port, + [:binary, packet: 2, active: false, reuseaddr: true]) + Logger.info "Accepting connections on port #{port}" + loop_acceptor(socket, port) + end + + defp loop_acceptor(socket, my_port) do + {:ok, client} = :gen_tcp.accept(socket) + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}}) + :ok = :gen_tcp.controlling_process(client, pid) + loop_acceptor(socket, my_port) + end +end + diff --git a/shard/mix.exs b/shard/mix.exs new file mode 100644 index 0000000..7192feb --- /dev/null +++ b/shard/mix.exs @@ -0,0 +1,33 @@ +defmodule Shard.MixProject do + use Mix.Project + + def project do + [ + app: :shard, + version: "0.1.0", + elixir: "~> 1.6", + build_embedded: Mix.env == :prod, + start_permanent: Mix.env() == :prod, + deps: deps(), + test_coverage: [tool: ExCoveralls], + preferred_cli_env: [coveralls: :test, "coveralls.detail": :test, "coveralls.post": :test, "coveralls.html": :test] + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger], + mod: {Shard.Application, []} + ] + end + + # Run "mix help deps" to learn about dependencies. + defp deps do + [ + {:excoveralls, "~> 0.10", only: :test}, + + {:salty, "~> 0.1.3", hex: :libsalty}, + ] + end +end diff --git a/shard/mix.lock b/shard/mix.lock new file mode 100644 index 0000000..4f92013 --- /dev/null +++ b/shard/mix.lock @@ -0,0 +1,20 @@ +%{ + "certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"}, + "cowboy": {:hex, :cowboy, "1.1.2", "61ac29ea970389a88eca5a65601460162d370a70018afe6f949a29dca91f3bb0", [:rebar3], [{:cowlib, "~> 1.0.2", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3.2", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"}, + "cowlib": {:hex, :cowlib, "1.0.2", "9d769a1d062c9c3ac753096f868ca121e2730b9a377de23dec0f7e08b1df84ee", [:make], [], "hexpm"}, + "elixir_make": {:hex, :elixir_make, "0.4.2", "332c649d08c18bc1ecc73b1befc68c647136de4f340b548844efc796405743bf", [:mix], [], "hexpm"}, + "ex2ms": {:hex, :ex2ms, "1.5.0", "19e27f9212be9a96093fed8cdfbef0a2b56c21237196d26760f11dfcfae58e97", [:mix], [], "hexpm"}, + "excoveralls": {:hex, :excoveralls, "0.10.0", "a4508bdd408829f38e7b2519f234b7fd5c83846099cda348efcb5291b081200c", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, + "hackney": {:hex, :hackney, "1.13.0", "24edc8cd2b28e1c652593833862435c80661834f6c9344e84b6a2255e7aeef03", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, + "idna": {:hex, :idna, "5.1.2", "e21cb58a09f0228a9e0b95eaa1217f1bcfc31a1aaa6e1fdf2f53a33f7dbd9494", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, + "jason": {:hex, :jason, "1.1.1", "d3ccb840dfb06f2f90a6d335b536dd074db748b3e7f5b11ab61d239506585eb2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, + "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, + "mime": {:hex, :mime, "1.3.0", "5e8d45a39e95c650900d03f897fbf99ae04f60ab1daa4a34c7a20a5151b7a5fe", [:mix], [], "hexpm"}, + "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"}, + "parse_trans": {:hex, :parse_trans, "3.2.0", "2adfa4daf80c14dc36f522cf190eb5c4ee3e28008fc6394397c16f62a26258c2", [:rebar3], [], "hexpm"}, + "plug": {:hex, :plug, "1.3.6", "bcdf94ac0f4bc3b804bdbdbde37ebf598bd7ed2bfa5106ed1ab5984a09b7e75f", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1", [hex: :cowboy, repo: "hexpm", optional: true]}, {:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}], "hexpm"}, + "ranch": {:hex, :ranch, "1.3.2", "e4965a144dc9fbe70e5c077c65e73c57165416a901bd02ea899cfd95aa890986", [:rebar3], [], "hexpm"}, + "salty": {:hex, :libsalty, "0.1.3", "13332eb13ac995f5deb76903b44f96f740e1e3a6e511222bffdd8b42cd079ffb", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"}, +} diff --git a/shard/test/conn_test.exs b/shard/test/conn_test.exs new file mode 100644 index 0000000..275f6dd --- /dev/null +++ b/shard/test/conn_test.exs @@ -0,0 +1,62 @@ +defmodule ShardTest.Conn do + use ExUnit.Case + doctest Shard.Application + + require Salty.Box.Curve25519xchacha20poly1305, as: Box + require Salty.Sign.Ed25519, as: Sign + + test "crypto connection" do + {:ok, srv_pkey, srv_skey} = Sign.keypair + {:ok, sess_pkey, sess_skey} = Box.keypair + {:ok, challenge} = Salty.Random.buf 32 + {:ok, socket} = :gen_tcp.connect {127,0,0,1}, 4044, [:binary, packet: 2, active: false] + + hello = {srv_pkey, sess_pkey, challenge, 0} + :gen_tcp.send(socket, :erlang.term_to_binary hello) + {:ok, pkt} = :gen_tcp.recv(socket, 0) + {cli_pkey, cli_sess_pkey, cli_challenge, _his_port} = :erlang.binary_to_term(pkt, [:safe]) + + {:ok, cli_challenge_sign} = Sign.sign_detached(cli_challenge, srv_skey) + sendmsg(socket, cli_challenge_sign, cli_sess_pkey, sess_skey) + + challenge_sign = recvmsg(socket, cli_sess_pkey, sess_skey) + :ok = Sign.verify_detached(challenge_sign, challenge, cli_pkey) + + pkt = :erlang.binary_to_term(recvmsg(socket, cli_sess_pkey, sess_skey), [:safe]) + IO.puts (inspect pkt) + end + + defp sendmsg(sock, msg, pk, sk) do + {:ok, n} = Salty.Random.buf Box.noncebytes + {:ok, msg} = Box.easy(msg, n, pk, sk) + :gen_tcp.send(sock, n <> msg) + end + + defp recvmsg(sock, pk, sk) do + {:ok, pkt} = :gen_tcp.recv(sock, 0) + n = binary_part(pkt, 0, Box.noncebytes) + enc = binary_part(pkt, Box.noncebytes, (byte_size pkt) - Box.noncebytes) + {:ok, msg} = Box.open_easy(enc, n, pk, sk) + msg + end + + + test "set nickname" do + Shard.Identity.set_nickname "test bot" + end + + test "connect to other instance" do + Shard.Manager.add_peer({127, 0, 0, 1}, 4045) + receive do after 100 -> nil end + end + + test "connect to chat rooms" do + {:ok, pid1} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"}) + {:ok, pid2} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "other_test"}) + GenServer.cast(pid1, {:chat_send, "test msg 1"}) + GenServer.cast(pid2, {:chat_send, "test msg 2"}) + + {:error, :redundant} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"}) + end + +end diff --git a/shard/test/mkllst_test.exs b/shard/test/mkllst_test.exs new file mode 100644 index 0000000..68dacf3 --- /dev/null +++ b/shard/test/mkllst_test.exs @@ -0,0 +1,18 @@ +defmodule ShardTest.MklLst do + use ExUnit.Case + doctest Shard.Application + + test "merkle list" do + alias SData.MerkleList, as: ML + + mkl = ML.new(&SData.cmp_ts_str/2) + + {:ok, [], nil} = ML.read(mkl) + + mkl = ML.insert(mkl, {12, "aa, bb"}) + mkl = ML.insert_many(mkl, [{14, "qwerty"}, {8, "haha"}]) + mkl = ML.insert(mkl, {14, "qwerty"}) + {:ok, list, nil} = ML.read(mkl, nil, nil) + assert length(list) == 3 + end +end diff --git a/shard/test/mst_test.exs b/shard/test/mst_test.exs new file mode 100644 index 0000000..c1758ad --- /dev/null +++ b/shard/test/mst_test.exs @@ -0,0 +1,197 @@ +defmodule ShardTest.MST do + use ExUnit.Case + alias SData.MerkleSearchTree, as: MST + doctest Shard.Application + + test "merkle search tree 1" do + y = Enum.reduce(0..1000, %MST{}, + fn i, acc -> MST.insert(acc, i) end) + + + z = Enum.reduce(Enum.shuffle(0..1000), %MST{}, + fn i, acc -> MST.insert(acc, i) end) + + for i <- 0..1000 do + assert MST.get(y, i) == true + assert MST.get(z, i) == true + end + assert MST.get(y, 9999) == nil + assert MST.get(z, -1001) == nil + assert MST.get(z, 1.01) == nil + + IO.puts "y.root: #{y.root|>Base.encode16}" + IO.puts "z.root: #{z.root|>Base.encode16}" + assert y.root == z.root + end + + test "merkle search tree 2" do + items = for i <- 0..1000 do + h = SData.term_hash i + {h, SData.term_hash h} + end + + merge = fn a, b -> if a > b do a else b end end + + y = Enum.reduce(items, %MST{merge: merge}, + fn {k, v}, acc -> MST.insert(acc, k, v) end) + + z = Enum.reduce(Enum.shuffle(items), %MST{merge: merge}, + fn {k, v}, acc -> MST.insert(acc, k, v) end) + + for {k, v} <- items do + assert MST.get(y, k) == v + assert MST.get(z, k) == v + end + + IO.puts "y.root: #{y.root|>Base.encode16}" + IO.puts "z.root: #{z.root|>Base.encode16}" + assert y.root == z.root + end + + test "merkle search tree 3" do + merge = fn a, b -> if a > b do a else b end end + + y = Enum.reduce(0..1000, %MST{merge: merge}, + fn i, acc -> MST.insert(acc, i, i) end) + y = Enum.reduce(0..1000, y, + fn i, acc -> MST.insert(acc, i, i + 2 * rem(i, 2) - 1) end) + + + z = Enum.reduce(Enum.shuffle(0..1000), %MST{merge: merge}, + fn i, acc -> MST.insert(acc, i, i) end) + z = Enum.reduce(Enum.shuffle(0..1000), z, + fn i, acc -> MST.insert(acc, i, i + 2 * rem(i, 2) - 1) end) + + for i <- 0..1000 do + val = if rem(i, 2) == 1 do i+1 else i end + assert MST.get(y, i) == val + assert MST.get(z, i) == val + end + assert MST.get(y, 9999) == nil + assert MST.get(z, -1001) == nil + assert MST.get(z, 1.01) == nil + + IO.puts "y.root: #{y.root|>Base.encode16}" + IO.puts "z.root: #{z.root|>Base.encode16}" + assert y.root == z.root + end + + test "merkle search tree 4" do + items = for i <- 0..1000 do + h = SData.term_hash i + {h, SData.term_hash h} + end + + cmp = fn {a1, b1}, {a2, b2} -> + cond do + a1 < a2 -> :before + a1 > a2 -> :after + b1 > b2 -> :before + b1 < b2 -> :after + true -> :duplicate + end + end + + y = Enum.reduce(items, %MST{cmp: cmp}, + fn {a, b}, acc -> MST.insert(acc, {a, b}) end) + + z = Enum.reduce(Enum.shuffle(items), %MST{cmp: cmp}, + fn {a, b}, acc -> MST.insert(acc, {a, b}) end) + + for {k, v} <- items do + assert MST.get(y, {k, v}) == true + assert MST.get(z, {k, v}) == true + end + assert MST.get(z, {"foo", "bar"}) == nil + + IO.puts "y.root: #{y.root|>Base.encode16}" + IO.puts "z.root: #{z.root|>Base.encode16}" + assert y.root == z.root + + MST.last(y, nil, 10) + end + + test "merkle search tree 5: MST.last" do + y = Enum.reduce(0..1000, %MST{}, + fn i, acc -> MST.insert(acc, i) end) + + assert(MST.last(y, nil, 2) == [{999, true}, {1000, true}]) + assert(MST.last(y, 42, 2) == [{40, true}, {41, true}]) + + stuff = for i <- 100..199, do: {i, true} + assert MST.last(y, 200, 100) == stuff + + stuff = for i <- 200..299, do: {i, true} + assert MST.last(y, 300, 100) == stuff + + stuff = for i <- 200..499, do: {i, true} + assert MST.last(y, 500, 300) == stuff + end + + test "merkle search tree 6: MST.merge" do + y = Enum.reduce([1, 2, 42], %MST{}, fn i, acc -> MST.insert(acc, i) end) + z = Enum.reduce([2, 12], %MST{}, fn i, acc -> MST.insert(acc, i) end) + + IO.puts "y: " + MST.dump y + IO.puts "z: " + MST.dump z + + mg1 = MST.merge(y, z) + IO.puts "mg1: " + MST.dump mg1 + + mg2 = MST.merge(y, z) + IO.puts "mg2: " + MST.dump mg2 + assert mg1.root == mg2.root + end + + test "merkle search tree 7: MST.merge" do + items1 = (for i <- 1..1000, do: i*2+40) + items2 = (for i <- 1..1000, do: i*3) + + y = Enum.reduce(items1, %MST{}, fn i, acc -> MST.insert(acc, i) end) + z = Enum.reduce(items2, %MST{}, fn i, acc -> MST.insert(acc, i) end) + + IO.puts "(merge) y.root: #{y.root|>Base.encode16}" + IO.puts "(merge) z.root: #{z.root|>Base.encode16}" + + mg1 = MST.merge(y, z) + IO.puts "(merge) mg1.root: #{mg1.root|>Base.encode16}" + + mg2 = MST.merge(y, z) + IO.puts "(merge) mg2.root: #{mg2.root|>Base.encode16}" + + assert mg1.root == mg2.root + + items1t = (for i <- items1, do: {i, true}) + items2t = (for i <- items2, do: {i, true}) + assert MST.last(y, nil, 1000) == items1t + assert MST.last(z, nil, 1000) == items2t + + all_items = (items1t ++ items2t) |> Enum.sort |> Enum.uniq + assert MST.last(mg1, nil, 2000) == all_items + end + + test "merkle search tree 8: MST.merge callback" do + items1 = (for i <- 1..1000, do: i*2+40) + items2 = (for i <- 1..1000, do: i*3) + + y = Enum.reduce(items1, %MST{}, fn i, acc -> MST.insert(acc, i) end) + z = Enum.reduce(items2, %MST{}, fn i, acc -> MST.insert(acc, i) end) + + {:ok, cb_called} = Agent.start_link fn -> [] end + + cb = fn i, true -> Agent.update(cb_called, fn x -> [i | x] end) end + mg = MST.merge(y, z, cb) + + cb_vals = Agent.get cb_called, &(&1) + expected = MapSet.difference(MapSet.new(items2), MapSet.new(items1)) + |> MapSet.to_list + |> Enum.sort + |> Enum.reverse + assert expected == cb_vals + end + +end diff --git a/shard/test/test_helper.exs b/shard/test/test_helper.exs new file mode 100644 index 0000000..e5b6600 --- /dev/null +++ b/shard/test/test_helper.exs @@ -0,0 +1,8 @@ +ExUnit.start() + +case :gen_tcp.connect('localhost', 4045, []) do + {:ok, socket} -> + :gen_tcp.close(socket) + {:error, _reason} -> + Mix.raise "Please have another instance of Shard running at 127.0.0.1:4045, it can be launched with the command: PORT=4045 iex -S mix" +end diff --git a/test/conn_test.exs b/test/conn_test.exs deleted file mode 100644 index 275f6dd..0000000 --- a/test/conn_test.exs +++ /dev/null @@ -1,62 +0,0 @@ -defmodule ShardTest.Conn do - use ExUnit.Case - doctest Shard.Application - - require Salty.Box.Curve25519xchacha20poly1305, as: Box - require Salty.Sign.Ed25519, as: Sign - - test "crypto connection" do - {:ok, srv_pkey, srv_skey} = Sign.keypair - {:ok, sess_pkey, sess_skey} = Box.keypair - {:ok, challenge} = Salty.Random.buf 32 - {:ok, socket} = :gen_tcp.connect {127,0,0,1}, 4044, [:binary, packet: 2, active: false] - - hello = {srv_pkey, sess_pkey, challenge, 0} - :gen_tcp.send(socket, :erlang.term_to_binary hello) - {:ok, pkt} = :gen_tcp.recv(socket, 0) - {cli_pkey, cli_sess_pkey, cli_challenge, _his_port} = :erlang.binary_to_term(pkt, [:safe]) - - {:ok, cli_challenge_sign} = Sign.sign_detached(cli_challenge, srv_skey) - sendmsg(socket, cli_challenge_sign, cli_sess_pkey, sess_skey) - - challenge_sign = recvmsg(socket, cli_sess_pkey, sess_skey) - :ok = Sign.verify_detached(challenge_sign, challenge, cli_pkey) - - pkt = :erlang.binary_to_term(recvmsg(socket, cli_sess_pkey, sess_skey), [:safe]) - IO.puts (inspect pkt) - end - - defp sendmsg(sock, msg, pk, sk) do - {:ok, n} = Salty.Random.buf Box.noncebytes - {:ok, msg} = Box.easy(msg, n, pk, sk) - :gen_tcp.send(sock, n <> msg) - end - - defp recvmsg(sock, pk, sk) do - {:ok, pkt} = :gen_tcp.recv(sock, 0) - n = binary_part(pkt, 0, Box.noncebytes) - enc = binary_part(pkt, Box.noncebytes, (byte_size pkt) - Box.noncebytes) - {:ok, msg} = Box.open_easy(enc, n, pk, sk) - msg - end - - - test "set nickname" do - Shard.Identity.set_nickname "test bot" - end - - test "connect to other instance" do - Shard.Manager.add_peer({127, 0, 0, 1}, 4045) - receive do after 100 -> nil end - end - - test "connect to chat rooms" do - {:ok, pid1} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"}) - {:ok, pid2} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "other_test"}) - GenServer.cast(pid1, {:chat_send, "test msg 1"}) - GenServer.cast(pid2, {:chat_send, "test msg 2"}) - - {:error, :redundant} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"}) - end - -end diff --git a/test/mkllst_test.exs b/test/mkllst_test.exs deleted file mode 100644 index 68dacf3..0000000 --- a/test/mkllst_test.exs +++ /dev/null @@ -1,18 +0,0 @@ -defmodule ShardTest.MklLst do - use ExUnit.Case - doctest Shard.Application - - test "merkle list" do - alias SData.MerkleList, as: ML - - mkl = ML.new(&SData.cmp_ts_str/2) - - {:ok, [], nil} = ML.read(mkl) - - mkl = ML.insert(mkl, {12, "aa, bb"}) - mkl = ML.insert_many(mkl, [{14, "qwerty"}, {8, "haha"}]) - mkl = ML.insert(mkl, {14, "qwerty"}) - {:ok, list, nil} = ML.read(mkl, nil, nil) - assert length(list) == 3 - end -end diff --git a/test/mst_test.exs b/test/mst_test.exs deleted file mode 100644 index c1758ad..0000000 --- a/test/mst_test.exs +++ /dev/null @@ -1,197 +0,0 @@ -defmodule ShardTest.MST do - use ExUnit.Case - alias SData.MerkleSearchTree, as: MST - doctest Shard.Application - - test "merkle search tree 1" do - y = Enum.reduce(0..1000, %MST{}, - fn i, acc -> MST.insert(acc, i) end) - - - z = Enum.reduce(Enum.shuffle(0..1000), %MST{}, - fn i, acc -> MST.insert(acc, i) end) - - for i <- 0..1000 do - assert MST.get(y, i) == true - assert MST.get(z, i) == true - end - assert MST.get(y, 9999) == nil - assert MST.get(z, -1001) == nil - assert MST.get(z, 1.01) == nil - - IO.puts "y.root: #{y.root|>Base.encode16}" - IO.puts "z.root: #{z.root|>Base.encode16}" - assert y.root == z.root - end - - test "merkle search tree 2" do - items = for i <- 0..1000 do - h = SData.term_hash i - {h, SData.term_hash h} - end - - merge = fn a, b -> if a > b do a else b end end - - y = Enum.reduce(items, %MST{merge: merge}, - fn {k, v}, acc -> MST.insert(acc, k, v) end) - - z = Enum.reduce(Enum.shuffle(items), %MST{merge: merge}, - fn {k, v}, acc -> MST.insert(acc, k, v) end) - - for {k, v} <- items do - assert MST.get(y, k) == v - assert MST.get(z, k) == v - end - - IO.puts "y.root: #{y.root|>Base.encode16}" - IO.puts "z.root: #{z.root|>Base.encode16}" - assert y.root == z.root - end - - test "merkle search tree 3" do - merge = fn a, b -> if a > b do a else b end end - - y = Enum.reduce(0..1000, %MST{merge: merge}, - fn i, acc -> MST.insert(acc, i, i) end) - y = Enum.reduce(0..1000, y, - fn i, acc -> MST.insert(acc, i, i + 2 * rem(i, 2) - 1) end) - - - z = Enum.reduce(Enum.shuffle(0..1000), %MST{merge: merge}, - fn i, acc -> MST.insert(acc, i, i) end) - z = Enum.reduce(Enum.shuffle(0..1000), z, - fn i, acc -> MST.insert(acc, i, i + 2 * rem(i, 2) - 1) end) - - for i <- 0..1000 do - val = if rem(i, 2) == 1 do i+1 else i end - assert MST.get(y, i) == val - assert MST.get(z, i) == val - end - assert MST.get(y, 9999) == nil - assert MST.get(z, -1001) == nil - assert MST.get(z, 1.01) == nil - - IO.puts "y.root: #{y.root|>Base.encode16}" - IO.puts "z.root: #{z.root|>Base.encode16}" - assert y.root == z.root - end - - test "merkle search tree 4" do - items = for i <- 0..1000 do - h = SData.term_hash i - {h, SData.term_hash h} - end - - cmp = fn {a1, b1}, {a2, b2} -> - cond do - a1 < a2 -> :before - a1 > a2 -> :after - b1 > b2 -> :before - b1 < b2 -> :after - true -> :duplicate - end - end - - y = Enum.reduce(items, %MST{cmp: cmp}, - fn {a, b}, acc -> MST.insert(acc, {a, b}) end) - - z = Enum.reduce(Enum.shuffle(items), %MST{cmp: cmp}, - fn {a, b}, acc -> MST.insert(acc, {a, b}) end) - - for {k, v} <- items do - assert MST.get(y, {k, v}) == true - assert MST.get(z, {k, v}) == true - end - assert MST.get(z, {"foo", "bar"}) == nil - - IO.puts "y.root: #{y.root|>Base.encode16}" - IO.puts "z.root: #{z.root|>Base.encode16}" - assert y.root == z.root - - MST.last(y, nil, 10) - end - - test "merkle search tree 5: MST.last" do - y = Enum.reduce(0..1000, %MST{}, - fn i, acc -> MST.insert(acc, i) end) - - assert(MST.last(y, nil, 2) == [{999, true}, {1000, true}]) - assert(MST.last(y, 42, 2) == [{40, true}, {41, true}]) - - stuff = for i <- 100..199, do: {i, true} - assert MST.last(y, 200, 100) == stuff - - stuff = for i <- 200..299, do: {i, true} - assert MST.last(y, 300, 100) == stuff - - stuff = for i <- 200..499, do: {i, true} - assert MST.last(y, 500, 300) == stuff - end - - test "merkle search tree 6: MST.merge" do - y = Enum.reduce([1, 2, 42], %MST{}, fn i, acc -> MST.insert(acc, i) end) - z = Enum.reduce([2, 12], %MST{}, fn i, acc -> MST.insert(acc, i) end) - - IO.puts "y: " - MST.dump y - IO.puts "z: " - MST.dump z - - mg1 = MST.merge(y, z) - IO.puts "mg1: " - MST.dump mg1 - - mg2 = MST.merge(y, z) - IO.puts "mg2: " - MST.dump mg2 - assert mg1.root == mg2.root - end - - test "merkle search tree 7: MST.merge" do - items1 = (for i <- 1..1000, do: i*2+40) - items2 = (for i <- 1..1000, do: i*3) - - y = Enum.reduce(items1, %MST{}, fn i, acc -> MST.insert(acc, i) end) - z = Enum.reduce(items2, %MST{}, fn i, acc -> MST.insert(acc, i) end) - - IO.puts "(merge) y.root: #{y.root|>Base.encode16}" - IO.puts "(merge) z.root: #{z.root|>Base.encode16}" - - mg1 = MST.merge(y, z) - IO.puts "(merge) mg1.root: #{mg1.root|>Base.encode16}" - - mg2 = MST.merge(y, z) - IO.puts "(merge) mg2.root: #{mg2.root|>Base.encode16}" - - assert mg1.root == mg2.root - - items1t = (for i <- items1, do: {i, true}) - items2t = (for i <- items2, do: {i, true}) - assert MST.last(y, nil, 1000) == items1t - assert MST.last(z, nil, 1000) == items2t - - all_items = (items1t ++ items2t) |> Enum.sort |> Enum.uniq - assert MST.last(mg1, nil, 2000) == all_items - end - - test "merkle search tree 8: MST.merge callback" do - items1 = (for i <- 1..1000, do: i*2+40) - items2 = (for i <- 1..1000, do: i*3) - - y = Enum.reduce(items1, %MST{}, fn i, acc -> MST.insert(acc, i) end) - z = Enum.reduce(items2, %MST{}, fn i, acc -> MST.insert(acc, i) end) - - {:ok, cb_called} = Agent.start_link fn -> [] end - - cb = fn i, true -> Agent.update(cb_called, fn x -> [i | x] end) end - mg = MST.merge(y, z, cb) - - cb_vals = Agent.get cb_called, &(&1) - expected = MapSet.difference(MapSet.new(items2), MapSet.new(items1)) - |> MapSet.to_list - |> Enum.sort - |> Enum.reverse - assert expected == cb_vals - end - -end diff --git a/test/test_helper.exs b/test/test_helper.exs deleted file mode 100644 index e5b6600..0000000 --- a/test/test_helper.exs +++ /dev/null @@ -1,8 +0,0 @@ -ExUnit.start() - -case :gen_tcp.connect('localhost', 4045, []) do - {:ok, socket} -> - :gen_tcp.close(socket) - {:error, _reason} -> - Mix.raise "Please have another instance of Shard running at 127.0.0.1:4045, it can be launched with the command: PORT=4045 iex -S mix" -end -- cgit v1.2.3