aboutsummaryrefslogtreecommitdiff
path: root/shard
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-09-01 16:06:23 +0200
committerAlex Auvolat <alex@adnab.me>2018-09-01 16:06:23 +0200
commitc6ec33d6e612168e14d77007915a4ea423c55a2e (patch)
tree8b5645651a0cc991b8ac9c68c388d84c8dbe73d2 /shard
parent1a0ef154a421af60f6d57dfe861dacb844a7d142 (diff)
downloadshard-c6ec33d6e612168e14d77007915a4ea423c55a2e.tar.gz
shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.zip
Move everything to subdirectory
Diffstat (limited to 'shard')
-rw-r--r--shard/.formatter.exs4
-rw-r--r--shard/.gitignore26
-rw-r--r--shard/README.md16
-rw-r--r--shard/config/config.exs42
-rw-r--r--shard/lib/app/blockstore.ex149
-rw-r--r--shard/lib/app/chat.ex208
-rw-r--r--shard/lib/application.ex33
-rw-r--r--shard/lib/cli/cli.ex98
-rw-r--r--shard/lib/data/data.ex49
-rw-r--r--shard/lib/data/merklelist.ex143
-rw-r--r--shard/lib/data/merklesearchtree.ex387
-rw-r--r--shard/lib/data/store.ex91
-rw-r--r--shard/lib/identity.ex46
-rw-r--r--shard/lib/manager.ex243
-rw-r--r--shard/lib/net/tcpconn.ex106
-rw-r--r--shard/lib/net/tcpserver.ex26
-rw-r--r--shard/mix.exs33
-rw-r--r--shard/mix.lock20
-rw-r--r--shard/test/conn_test.exs62
-rw-r--r--shard/test/mkllst_test.exs18
-rw-r--r--shard/test/mst_test.exs197
-rw-r--r--shard/test/test_helper.exs8
22 files changed, 2005 insertions, 0 deletions
diff --git a/shard/.formatter.exs b/shard/.formatter.exs
new file mode 100644
index 0000000..525446d
--- /dev/null
+++ b/shard/.formatter.exs
@@ -0,0 +1,4 @@
+# Used by "mix format"
+[
+ inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
+]
diff --git a/shard/.gitignore b/shard/.gitignore
new file mode 100644
index 0000000..ed31458
--- /dev/null
+++ b/shard/.gitignore
@@ -0,0 +1,26 @@
+# The directory Mix will write compiled artifacts to.
+/_build/
+
+# If you run "mix test --cover", coverage assets end up here.
+/cover/
+
+# The directory Mix downloads your dependencies sources to.
+/deps/
+
+# Where 3rd-party dependencies like ExDoc output generated docs.
+/doc/
+
+# Ignore .fetch files in case you like to edit your project deps locally.
+/.fetch
+
+# If the VM crashes, it generates a dump, let's ignore it too.
+erl_crash.dump
+
+# Also ignore archive artifacts (built via "mix archive.build").
+*.ez
+
+# Ignore package tarball (built via "mix hex.build").
+shard-*.tar
+
+# vim files
+*.swp
diff --git a/shard/README.md b/shard/README.md
new file mode 100644
index 0000000..7abb1ce
--- /dev/null
+++ b/shard/README.md
@@ -0,0 +1,16 @@
+# Shard
+
+Tests of peer-to-peer stuff. Right now it's a little chat application with replication of message history over peers. Nothing is secure, contrary to the name of the project.
+
+## Installation
+
+Download and install [Elixir](https://elixir-lang.org/).
+
+```
+mix deps.get
+mix compile
+mix run --no-halt
+```
+
+P2P port is 4044 by default, can be changed by setting the `$PORT` environment variable. HTTP interface is on port `$PORT`+1000.
+
diff --git a/shard/config/config.exs b/shard/config/config.exs
new file mode 100644
index 0000000..f970078
--- /dev/null
+++ b/shard/config/config.exs
@@ -0,0 +1,42 @@
+# This file is responsible for configuring your application
+# and its dependencies with the aid of the Mix.Config module.
+use Mix.Config
+
+# This configuration is loaded before any dependency and is restricted
+# to this project. If another project depends on this project, this
+# file won't be loaded nor affect the parent project. For this reason,
+# if you want to provide default values for your application for
+# 3rd-party users, it should be done in your "mix.exs" file.
+
+# You can configure your application as:
+#
+# config :shard, key: :value
+#
+# and access this configuration in your application as:
+#
+# Application.get_env(:shard, :key)
+#
+# You can also configure a 3rd-party app:
+#
+# config :logger, level: :info
+#
+
+
+# Peer id suffix
+# ==============
+# This Shard instance will only connect to other instances that use
+# the same suffix.
+#
+# On first run, the instance will try to generate a peer id that
+# has this suffix. This is done by brute-force testing, therefore
+# it is not recommended to use long suffixes.
+config :shard, peer_id_suffix: "S"
+
+
+# It is also possible to import configuration files, relative to this
+# directory. For example, you can emulate configuration per environment
+# by uncommenting the line below and defining dev.exs, test.exs and such.
+# Configuration from the imported file will override the ones defined
+# here (which is why it is important to import them last).
+#
+# import_config "#{Mix.env}.exs"
diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/blockstore.ex
new file mode 100644
index 0000000..8e4fddc
--- /dev/null
+++ b/shard/lib/app/blockstore.ex
@@ -0,0 +1,149 @@
+defmodule SApp.BlockStore do
+ @moduledoc """
+ A module that implements a content-adressable storage (blocks, or pages,
+ identified by the hash of their contents).
+
+ This is not a shard, it is a side process that a shard may use to store its data.
+
+ TODO: WIP
+ """
+
+ use GenServer
+
+ @enforce_keys [:pid]
+ defstruct [:pid, :prefer_ask]
+
+
+ defmodule State do
+ defstruct [:shard_id, :path, :store, :reqs, :retries]
+ end
+
+
+ def start_link(shard_id, path) do
+ GenServer.start_link(__MODULE__, [shard_id, path])
+ end
+
+ def init([shard_id, path]) do
+ Shard.Manager.dispatch_to(shard_id, path, self())
+
+ {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}}
+ end
+
+ def handle_call({:get, key, prefer_ask}, from, state) do
+ case state.store[key] do
+ nil ->
+ case prefer_ask do
+ [_ | _] ->
+ for peer <- prefer_ask do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ _ ->
+ ask_random_peers(state, key)
+ end
+ reqs_key = case state.reqs[key] do
+ nil ->
+ MapSet.put(MapSet.new(), from)
+ ms ->
+ MapSet.put(ms, from)
+ end
+ state = put_in(state.reqs[key], reqs_key)
+ {:noreply, state}
+ v ->
+ {:reply, v, state}
+ end
+ end
+
+ def handle_call({:put, val}, _from, state) do
+ hash = SData.term_hash val
+ state = %{state | store: Map.put(state.store, hash, val)}
+ {:reply, hash, state}
+ end
+
+ def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
+ state = case msg do
+ {:get, key} ->
+ case state.store[key] do
+ nil ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
+ v ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
+ end
+ state
+ {:info, hash, value} ->
+ if SData.term_hash value == hash do
+ reqs = case state.reqs[hash] do
+ nil -> state.reqs
+ pids ->
+ for pid <- pids do
+ GenServer.reply(pid, value)
+ end
+ Map.delete(state.reqs, hash)
+ end
+ state = %{state | retries: Map.delete(state.retries, hash)}
+ %{state | store: Map.put(state.store, hash, value), reqs: reqs}
+ else
+ state
+ end
+ {:not_found, key} ->
+ if state.reqs[key] != nil and state.store[key] == nil do
+ nretry = case state.retries[key] do
+ nil -> 1
+ n -> n+1
+ end
+ if nretry < 3 do
+ ask_random_peers(state, key)
+ %{state | retries: Map.put(state.retries, key, nretry)}
+ else
+ for pid <- state.reqs[key] do
+ GenServer.reply(pid, nil)
+ end
+ state = %{state | reqs: Map.delete(state.reqs, key)}
+ state = %{state | retries: Map.delete(state.retries, key)}
+ state
+ end
+ else
+ state
+ end
+ end
+ {:noreply, state}
+ end
+
+ def ask_random_peers(state, key) do
+ peers = :ets.lookup(:shard_peer_db, state.shard_id)
+ |> Enum.shuffle
+ |> Enum.take(3)
+ for {_, peer} <- peers do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ end
+
+
+ defimpl SData.PageStore do
+ def put(store, page) do
+ hash = GenServer.call(store.pid, {:put, page})
+ { hash, store }
+ end
+
+ def get(store, hash) do
+ try do
+ GenServer.call(store.pid, {:get, hash, store.prefer_ask})
+ catch
+ :exit, {:timeout, _} -> nil
+ end
+ end
+
+ def copy(store, other_store, hash) do
+ page = SData.PageStore.get(other_store, hash)
+ refs = SData.Page.refs(page)
+ for ref <- refs do
+ copy(store, other_store, ref)
+ end
+ GenServer.call(store.pid, {:put, page})
+ store
+ end
+
+ def free(store, _hash) do
+ store ## DO SOMETHING???
+ end
+ end
+end
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
new file mode 100644
index 0000000..051fab6
--- /dev/null
+++ b/shard/lib/app/chat.ex
@@ -0,0 +1,208 @@
+defmodule SApp.Chat do
+ @moduledoc """
+ Shard application for a replicated chat room with full history.
+
+ Chat rooms are globally identified by their channel name.
+ A chat room manifest is of the form:
+
+ {:chat, channel_name}
+
+ Future improvements:
+ - message signing
+ - storage of the chatroom messages to disk
+ - storage of the known peers that have this channel to disk
+ - use a DHT to find peers that are interested in this channel
+ - epidemic broadcast (carefull not to be too costly,
+ maybe by limiting the number of peers we talk to)
+ - partial synchronization only == data distributed over peers
+ """
+
+ use GenServer
+
+ require Logger
+ alias SData.MerkleSearchTree, as: MST
+
+ @doc """
+ Start a process that connects to a given channel
+ """
+ def start_link(channel) do
+ GenServer.start_link(__MODULE__, channel)
+ end
+
+ @doc """
+ Initialize channel process.
+ """
+ def init(channel) do
+ manifest = {:chat, channel}
+ id = SData.term_hash manifest
+
+ case Shard.Manager.register(id, manifest, self()) do
+ :ok ->
+ Shard.Manager.dispatch_to(id, nil, self())
+ {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store)
+ mst = %MST{store: %SApp.BlockStore{pid: block_store},
+ cmp: &msg_cmp/2}
+ GenServer.cast(self(), :init_pull)
+ {:ok,
+ %{channel: channel,
+ id: id,
+ manifest: manifest,
+ block_store: block_store,
+ mst: mst,
+ subs: MapSet.new,
+ }
+ }
+ :redundant ->
+ exit(:redundant)
+ end
+ end
+
+ @doc """
+ Implementation of the :manifest call that returns the chat room's manifest
+ """
+ def handle_call(:manifest, _from, state) do
+ {:reply, state.manifest, state}
+ end
+
+ def handle_call({:read_history, top_bound, num}, _from, state) do
+ ret = MST.last(state.mst, top_bound, num)
+ {:reply, ret, state}
+ end
+
+ @doc """
+ Implementation of the :init_pull handler, which is called when the
+ process starts. It contacts all currently connected peers and asks them to
+ send data for this channel if they have some.
+ """
+ def handle_cast(:init_pull, state) do
+ for {_, pid, _, _} <- :ets.tab2list(:peer_db) do
+ GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
+ end
+ {:noreply, state}
+ end
+
+ @doc """
+ Implementation of the :chat_send handler. This is the main handler that is used
+ to send a message to the chat room. Puts the message in the store and syncs
+ with all connected peers.
+ """
+ def handle_cast({:chat_send, msg}, state) do
+ msgitem = {(System.os_time :seconds),
+ Shard.Identity.get_nickname(),
+ msg}
+ prev_root = state.mst.root
+ mst = MST.insert(state.mst, msgitem)
+ state = %{state | mst: mst}
+
+ for pid <- state.subs do
+ if Process.alive?(pid) do
+ send(pid, {:chat_send, state.channel, msgitem})
+ end
+ end
+
+ notif = {:append, prev_root, msgitem, mst.root}
+ for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do
+ Shard.Manager.send(peer_id, {state.id, nil, notif})
+ end
+
+ {:noreply, state}
+ end
+
+ @doc """
+ Implementation of the :interested handler, this is called when a peer we are
+ connected to asks to recieve data for this channel.
+ """
+ def handle_cast({:interested, peer_id}, state) do
+ Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}})
+ {:noreply, state}
+ end
+
+ def handle_cast({:subscribe, pid}, state) do
+ Process.monitor(pid)
+ new_subs = MapSet.put(state.subs, pid)
+ {:noreply, %{ state | subs: new_subs }}
+ end
+
+ @doc """
+ Implementation of the :msg handler, which is the main handler for messages
+ comming from other peers concerning this chat room.
+
+ Messages are:
+ - `{:get, start}`: get some messages starting at a given Merkle hash
+ - `{:info, start, list, rest}`: put some messages and informs of the
+ Merkle hash of the store of older messages.
+ """
+ def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do
+ state = case msg do
+ {:get_manifest} ->
+ Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}})
+ state
+ {:append, prev_root, msgitem, new_root} ->
+ # Append message: one single mesage has arrived
+ if new_root == state.mst.root do
+ # We already have the message, do nothing
+ state
+ else
+ # Try adding the message
+ if prev_root == state.mst.root do
+ mst2 = MST.insert(state.mst, msgitem)
+ if mst2.root == new_root do
+ # This was the only message missing, we are happy!
+ state = %{state | mst: mst2}
+ msg_callback(state, msgitem)
+ state
+ else
+ # More messages are missing, start a full merge
+ init_merge(state, new_root, peer_id)
+ end
+ else
+ init_merge(state, new_root, peer_id)
+ end
+ end
+ {:root, new_root} ->
+ if new_root == state.mst.root do
+ # already up to date, ignore
+ state
+ else
+ init_merge(state, new_root, peer_id)
+ end
+ x ->
+ Logger.info("Unhandled message: #{inspect x}")
+ state
+ end
+ {:noreply, state}
+ end
+
+ defp init_merge(state, new_root, source_peer) do
+ # TODO: make the merge asynchronous
+ mgmst = %{state.mst | root: new_root}
+ mgmst = put_in(mgmst.store.prefer_ask, [source_peer])
+ mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end)
+ %{state | mst: mst}
+ end
+
+ def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
+ new_subs = MapSet.delete(state.subs, pid)
+ {:noreply, %{ state | subs: new_subs }}
+ end
+
+ defp msg_callback(state, {ts, nick, msg}) do
+ for pid <- state.subs do
+ if Process.alive?(pid) do
+ send(pid, {:chat_recv, state.channel, {ts, nick, msg}})
+ end
+ end
+ end
+
+ defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
+ cond do
+ ts1 > ts2 -> :after
+ ts1 < ts2 -> :before
+ nick1 > nick2 -> :after
+ nick1 < nick2 -> :before
+ msg1 > msg2 -> :after
+ msg1 < msg2 -> :before
+ true -> :duplicate
+ end
+ end
+end
diff --git a/shard/lib/application.ex b/shard/lib/application.ex
new file mode 100644
index 0000000..3e3a6ac
--- /dev/null
+++ b/shard/lib/application.ex
@@ -0,0 +1,33 @@
+defmodule Shard.Application do
+ @moduledoc """
+ Main Shard application.
+
+ Shard is a prototype peer-to-peer comunication platform with data
+ synchronization.
+ """
+
+ use Application
+
+ def start(_type, _args) do
+ import Supervisor.Spec, warn: false
+
+ {listen_port, _} = Integer.parse ((System.get_env "PORT") || "4044")
+
+ # Define workers and child supervisors to be supervised
+ children = [
+ Shard.Identity,
+ { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor },
+
+ # Networking
+ { SNet.TCPServer, listen_port },
+
+ # Applications & data store
+ { Shard.Manager, listen_port },
+ ]
+
+ # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
+ # for other strategies and supported options
+ opts = [strategy: :one_for_one, name: Shard.Supervisor]
+ Supervisor.start_link(children, opts)
+ end
+end
diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex
new file mode 100644
index 0000000..2fbf8c2
--- /dev/null
+++ b/shard/lib/cli/cli.ex
@@ -0,0 +1,98 @@
+defmodule SCLI do
+ @moduledoc """
+ Small command line interface for the chat application
+ """
+
+ def run() do
+ run(nil)
+ end
+
+ defp run(pid) do
+ handle_messages()
+
+ nick = Shard.Identity.get_nickname
+ prompt = case pid do
+ nil -> "(no channel) #{nick}: "
+ _ ->
+ {:chat, chan} = GenServer.call(pid, :manifest)
+ "##{chan} #{nick}: "
+ end
+
+ str = prompt |> IO.gets |> String.trim
+ cond do
+ str == "/quit" ->
+ nil
+ String.slice(str, 0..0) == "/" ->
+ command = str |> String.slice(1..-1) |> String.split(" ")
+ pid2 = handle_command(pid, command)
+ run(pid2)
+ true ->
+ if str != "" do
+ GenServer.cast(pid, {:chat_send, str})
+ end
+ run(pid)
+ end
+ end
+
+ defp handle_messages() do
+ receive do
+ {:chat_recv, chan, {ts, nick, msg}} ->
+ IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}"
+ handle_messages()
+ {:chat_send, _, _} ->
+ # do nothing
+ handle_messages()
+ after 10 -> nil
+ end
+ end
+
+ defp handle_command(pid, ["connect", ipstr, portstr]) do
+ {:ok, ip} = :inet.parse_address (to_charlist ipstr)
+ {port, _} = Integer.parse portstr
+ Shard.Manager.add_peer(ip, port)
+ pid
+ end
+
+ defp handle_command(pid, ["list"]) do
+ IO.puts "List of known channels:"
+
+ for {_chid, manifest, _chpid} <- :ets.tab2list(:shard_db) do
+ {:chat, chan} = manifest
+ IO.puts "##{chan}"
+ end
+ pid
+ end
+
+ defp handle_command(pid, ["hist"]) do
+ GenServer.call(pid, {:read_history, nil, 25})
+ |> Enum.each(fn {{ts, nick, msg}, true} ->
+ IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}"
+ end)
+ pid
+ end
+
+ defp handle_command(_pid, ["join", qchan]) do
+ list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db),
+ {:chat, chan} = manifest,
+ do: {chan, chpid}
+ case List.keyfind(list, qchan, 0) do
+ nil ->
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan})
+ GenServer.cast(pid, {:subscribe, self()})
+ pid
+ {_, pid} ->
+ IO.puts "Switching to ##{qchan}"
+ pid
+ end
+ end
+
+ defp handle_command(pid, ["nick", nick]) do
+ Shard.Identity.set_nickname nick
+ pid
+ end
+
+ defp handle_command(pid, _cmd) do
+ IO.puts "Invalid command"
+ pid
+ end
+end
diff --git a/shard/lib/data/data.ex b/shard/lib/data/data.ex
new file mode 100644
index 0000000..c2c659d
--- /dev/null
+++ b/shard/lib/data/data.ex
@@ -0,0 +1,49 @@
+defmodule SData do
+ @moduledoc """
+ Utility functions
+
+ Compare functions are functions that compares stored items and provides a total order.
+ They must return:
+ - `:after` if the first argument is more recent
+ - `:duplicate` if the two items are the same
+ - `:before` if the first argument is older
+ These functions must only return :duplicate for equal items.
+ """
+
+ @doc """
+ Calculate the hash of an Erlang term by first converting it to its
+ binary representation.
+ """
+ def term_hash(term, algo \\ :sha256) do
+ :crypto.hash(algo, (:erlang.term_to_binary term))
+ end
+
+ @doc"""
+ Compare function for arbitrary terms using the Erlang order
+ """
+ def cmp_term(a, b) do
+ cond do
+ a > b -> :after
+ a < b -> :before
+ a == b -> :duplicate
+ end
+ end
+
+ @doc"""
+ Compare function for timestamped strings
+ """
+ def cmp_ts_str({ts1, str1}, {ts2, str2}) do
+ cond do
+ ts1 > ts2 -> :after
+ ts1 < ts2 -> :before
+ str1 > str2 -> :after
+ str1 < str2 -> :before
+ true -> :duplicate
+ end
+ end
+
+ @doc"""
+ Merge function for nils
+ """
+ def merge_true(true, true), do: true
+end
diff --git a/shard/lib/data/merklelist.ex b/shard/lib/data/merklelist.ex
new file mode 100644
index 0000000..9b44ee8
--- /dev/null
+++ b/shard/lib/data/merklelist.ex
@@ -0,0 +1,143 @@
+defmodule SData.MerkleList do
+ @moduledoc"""
+ A simple Merkle list store.
+
+ Future improvements:
+ - When messages are inserted other than at the top, all intermediate hashes
+ change. Keep track of the mapping from old hashes to new hashes so that get
+ requests can work even for hashes that are not valid anymore.
+ - group items in "pages" (bigger bundles)
+ """
+
+ defstruct [:root, :top, :cmp, :store]
+
+ @doc"""
+ Create a Merkle list store.
+
+ `cmp` is a compare function that respects the interface defined in module `SData`.
+ """
+ def new(cmp) do
+ root_item = :root
+ root_hash = SData.term_hash root_item
+ state = %SData.MerkleList{
+ root: root_hash,
+ top: root_hash,
+ cmp: cmp,
+ store: %{ root_hash => root_item }
+ }
+ state
+ end
+
+ defp push(state, item) do
+ new_item = {item, state.top}
+ new_item_hash = SData.term_hash new_item
+ new_store = Map.put(state.store, new_item_hash, new_item)
+ %{ state | :top => new_item_hash, :store => new_store }
+ end
+
+ defp pop(state) do
+ if state.top == state.root do
+ :error
+ else
+ {item, next} = Map.get(state.store, state.top)
+ new_store = Map.delete(state.store, state.top)
+ new_state = %{ state | :top => next, :store => new_store }
+ {:ok, item, new_state}
+ end
+ end
+
+ @doc"""
+ Insert a list of items in the store.
+
+ A callback function may be specified that is called on any item
+ that is sucessfully added, i.e. that wasn't present in the store before.
+ """
+ def insert_many(state, items, callback \\ (fn _ -> nil end)) do
+ items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end)
+ insert_many_aux(state, items_sorted, callback)
+ end
+
+ defp insert_many_aux(state, [], _callback) do
+ state
+ end
+
+ defp insert_many_aux(state, [item | rest], callback) do
+ case pop(state) do
+ :error ->
+ new_state = push(insert_many_aux(state, rest, callback), item)
+ callback.(item)
+ new_state
+ {:ok, front, state_rest} ->
+ case state.cmp.(item, front) do
+ :after ->
+ new_state = push(insert_many_aux(state, rest, callback), item)
+ callback.(item)
+ new_state
+ :duplicate -> insert_many_aux(state, rest, callback)
+ :before -> push(insert_many_aux(state_rest, [item | rest], callback), front)
+ end
+ end
+ end
+
+ @doc"""
+ Insert a single item in the store.
+
+ A callback function may be specified that is called on the item
+ if it is sucessfully added, i.e. it wasn't present in the store before.
+ """
+ def insert(state, item, callback \\ (fn _ -> nil end)) do
+ insert_many(state, [item], callback)
+ end
+
+ @doc"""
+ Read some items from the state.
+
+ The two parameters are optional:
+ - qbegin : hash of the first item to read
+ - qlimit : number of items to read
+ """
+ def read(state, qbegin \\ nil, qlimit \\ nil) do
+ begin = qbegin || state.top
+ limit = qlimit || 20
+ get_items_list(state, begin, limit)
+ end
+
+ @doc"""
+ Get the hash of the last item
+ """
+ def top(state) do
+ state.top
+ end
+
+ @doc"""
+ Get the hash of the root item
+ """
+ def root(state) do
+ state.root
+ end
+
+ @doc"""
+ Check if the store holds a certain item
+ """
+ def has(state, hash) do
+ Map.has_key?(state.store, hash)
+ end
+
+ defp get_items_list(state, begin, limit) do
+ case limit do
+ 0 -> {:ok, [], begin}
+ _ ->
+ case Map.fetch(state.store, begin) do
+ {:ok, :root} ->
+ {:ok, [], nil }
+ {:ok, {item, next}} ->
+ case get_items_list(state, next, limit - 1) do
+ {:ok, rest, past} ->
+ {:ok, [ item | rest ], past }
+ {:error, reason} -> {:error, reason}
+ end
+ :error -> {:error, begin}
+ end
+ end
+ end
+end
diff --git a/shard/lib/data/merklesearchtree.ex b/shard/lib/data/merklesearchtree.ex
new file mode 100644
index 0000000..941d31d
--- /dev/null
+++ b/shard/lib/data/merklesearchtree.ex
@@ -0,0 +1,387 @@
+defmodule SData.MerkleSearchTree do
+ @moduledoc"""
+ A Merkle search tree.
+
+ A node of the tree is
+ {
+ level,
+ hash_of_node | nil,
+ [
+ { item_low_bound, hash_of_node | nil },
+ { item_low_bound, hash_of_node | nil },
+ ...
+ }
+ }
+ """
+
+ alias SData.PageStore, as: Store
+
+ @doc"""
+ Create a new Merkle search tree.
+
+ This structure can be used as a set with only true keys,
+ or as a map if a merge function is given.
+
+ `cmp` is a compare function for keys as defined in module `SData`.
+
+ `merge` is a function for merging two items that have the same key.
+ """
+ defstruct root: nil,
+ store: SData.LocalStore.new,
+ cmp: &SData.cmp_term/2,
+ merge: &SData.merge_true/2
+
+
+ defmodule Page do
+ defstruct [:level, :low, :list]
+ end
+
+ defimpl SData.Page, for: Page do
+ def refs(page) do
+ refs = for {_, _, h} <- page.list, h != nil, do: h
+ if page.low != nil do
+ [ page.low | refs ]
+ else
+ refs
+ end
+ end
+ end
+
+
+ @doc"""
+ Insert an item into the search tree.
+ """
+ def insert(state, key, value \\ true) do
+ level = calc_level(key)
+ {hash, store} = insert_at(state, state.store, state.root, key, level, value)
+ %{ state | root: hash, store: store }
+ end
+
+ defp insert_at(s, store, root, key, level, value) do
+ {new_page, store} = if root == nil do
+ { %Page{ level: level, low: nil, list: [ { key, value, nil } ] }, store }
+ else
+ %Page{ level: plevel, low: low, list: lst } = Store.get(store, root)
+ [ { k0, _, _} | _ ] = lst
+ cond do
+ plevel < level ->
+ {low, high, store} = split(s, store, root, key)
+ { %Page{ level: level, low: low, list: [ { key, value, high } ] }, store }
+ plevel == level ->
+ store = Store.free(store, root)
+ case s.cmp.(key, k0) do
+ :before ->
+ {low2a, low2b, store} = split(s, store, low, key)
+ { %Page{ level: level, low: low2a, list: [ { key, value, low2b } | lst] }, store }
+ _ ->
+ {new_lst, store} = aux_insert_after_first(s, store, lst, key, value)
+ { %Page{ level: plevel, low: low, list: new_lst }, store }
+ end
+ plevel > level ->
+ store = Store.free(store, root)
+ case s.cmp.(key, k0) do
+ :before ->
+ {new_low, store} = insert_at(s, store, low, key, level, value)
+ { %Page{ level: plevel, low: new_low, list: lst }, store }
+ :after ->
+ {new_lst, store} = aux_insert_sub_after_first(s, store, lst, key, level, value)
+ { %Page{ level: plevel, low: low, list: new_lst }, store }
+ end
+ end
+ end
+ Store.put(store, new_page) # returns {hash, store}
+ end
+
+ defp split(s, store, hash, key) do
+ if hash == nil do
+ {nil, nil, store}
+ else
+ %Page{ level: level, low: low, list: lst } = Store.get(store, hash) || Store.get(s.store, hash)
+ store = Store.free(store, hash)
+ [ { k0, _, _} | _ ] = lst
+ case s.cmp.(key, k0) do
+ :before ->
+ {lowlow, lowhi, store} = split(s, store, low, key)
+ newp2 = %Page{ level: level, low: lowhi, list: lst }
+ {newp2h, store} = Store.put(store, newp2)
+ {lowlow, newp2h, store}
+ :after ->
+ {lst1, p2, store} = split_aux(s, store, lst, key, level)
+ newp1 = %Page{ level: level, low: low, list: lst1}
+ {newp1h, store} = Store.put(store, newp1)
+ {newp1h, p2, store}
+ end
+ end
+ end
+
+ defp split_aux(s, store, lst, key, level) do
+ case lst do
+ [ {k1, v1, r1} ] ->
+ if s.cmp.(k1, key) == :duplicate do
+ raise "Bad logic"
+ end
+ {r1l, r1h, store} = split(s, store, r1, key)
+ { [{k1, v1, r1l}], r1h, store }
+ [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] ->
+ case s.cmp.(key, k2) do
+ :before ->
+ {r1l, r1h, store} = split(s, store, r1, key)
+ p2 = %Page{ level: level, low: r1h, list: [ {k2, v2, r2} | rst ] }
+ {p2h, store} = Store.put(store, p2)
+ { [{k1, v1, r1l}], p2h, store }
+ :after ->
+ {rst2, hi, store} = split_aux(s, store, [rst1 | rst], key, level)
+ { [ fst | rst2 ], hi, store }
+ :duplicate ->
+ raise "Bad logic"
+ end
+ end
+ end
+
+ defp aux_insert_after_first(s, store, lst, key, value) do
+ case lst do
+ [ {k1, v1, r1} ] ->
+ case s.cmp.(k1, key) do
+ :duplicate ->
+ { [ {k1, s.merge.(v1, value), r1} ], store }
+ :before ->
+ {r1a, r1b, new_store} = split(s, store, r1, key)
+ { [ {k1, v1, r1a}, {key, value, r1b} ], new_store }
+ end
+ [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] ->
+ case s.cmp.(k1, key) do
+ :duplicate ->
+ { [ {k1, s.merge.(v1, value), r1}, rst1 | rst ], store }
+ :before ->
+ case s.cmp.(k2, key) do
+ :after ->
+ {r1a, r1b, new_store} = split(s, store, r1, key)
+ { [ {k1, v1, r1a}, {key, value, r1b}, {k2, v2, r2} | rst ], new_store }
+ _ ->
+ {rst2, new_store} = aux_insert_after_first(s, store, [rst1 | rst], key, value)
+ { [ fst | rst2 ], new_store }
+ end
+ end
+ end
+ end
+
+ defp aux_insert_sub_after_first(s, store, lst, key, level, value) do
+ case lst do
+ [ {k1, v1, r1} ] ->
+ if s.cmp.(k1, key) == :duplicate do
+ raise "Bad logic"
+ end
+ {r1new, store_new} = insert_at(s, store, r1, key, level, value)
+ { [{k1, v1, r1new}], store_new }
+ [ {k1, v1, r1} = fst, {k2, _, _} = rst1 | rst ] ->
+ if s.cmp.(k1, key) == :duplicate do
+ raise "Bad logic"
+ end
+ case s.cmp.(key, k2) do
+ :before ->
+ {r1new, store_new} = insert_at(s, store, r1, key, level, value)
+ { [{k1, v1, r1new}, rst1 | rst], store_new }
+ _ ->
+ {rst2, new_store} = aux_insert_sub_after_first(s, store, [rst1 |rst], key, level, value)
+ { [ fst | rst2 ], new_store }
+ end
+ end
+ end
+
+ @doc"""
+ Merge values from another MST in this MST.
+
+ The merge is not symmetrical in the sense that:
+ - new pages are added in the store of the first argument
+ - the callback is called for all items found in the second argument and not the first
+ """
+ def merge(to, from, callback \\ fn _, _ -> nil end) do
+ { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback)
+ %{ to | store: store, root: root }
+ end
+
+ defp merge_aux(s1, s2, store, r1, r2, callback) do
+ case {r1, r2} do
+ _ when r1 == r2 -> { store, r1 }
+ {_, nil} ->
+ { store, r1 }
+ {nil, _} ->
+ store = Store.copy(store, s2.store, r2)
+ rec_callback(store, r2, callback)
+ { store, r2 }
+ _ ->
+ %Page{ level: level1, low: low1, list: lst1 } = Store.get(store, r1)
+ %Page{ level: level2, low: low2, list: lst2 } = Store.get(store, r2) || Store.get(s2.store, r2)
+ { level, low1, lst1, low2, lst2 } = cond do
+ level1 == level2 -> {level1, low1, lst1, low2, lst2}
+ level1 > level2 -> {level1, low1, lst1, r2, []}
+ level2 > level1 -> {level2, r1, [], low2, lst2}
+ end
+ { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback)
+ page = %Page{ level: level, low: low, list: lst }
+ {hash, store} = Store.put(store, page)
+ {store, hash}
+ end
+ end
+
+ defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) do
+ case {lst1, lst2} do
+ { [], [] } ->
+ {store, hash} = merge_aux(s1, s2, store, low1, low2, callback)
+ {store, hash, []}
+ { [], [ {k, v, r} | rst2 ] } ->
+ {low1l, low1h, store} = split(s1, store, low1, k)
+ {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback)
+ callback.(k, v)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2, callback)
+ {store, newlow, [ {k, v, newr} | newrst ]}
+ { [ {k, v, r} | rst1 ], [] } ->
+ {low2l, low2h, store} = split(s2, store, low2, k)
+ {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [], callback)
+ {store, newlow, [ {k, v, newr} | newrst ]}
+ { [ {k1, v1, r1} | rst1 ], [ {k2, v2, r2} | rst2 ] } ->
+ case s1.cmp.(k1, k2) do
+ :before ->
+ {low2l, low2h, store} = split(s2, store, low2, k1)
+ {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2, callback)
+ {store, newlow, [ {k1, v1, newr} | newrst ]}
+ :after ->
+ {low1l, low1h, store} = split(s1, store, low1, k2)
+ {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback)
+ callback.(k2, v2)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2, callback)
+ {store, newlow, [ {k2, v2, newr} | newrst ]}
+ :duplicate ->
+ {store, newlow} = merge_aux(s1, s2, store, low1, low2, callback)
+ newv = s1.merge.(v1, v2) ## TODO: callback here ??
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2, callback)
+ {store, newlow, [ {k1, newv, newr} | newrst ]}
+ end
+ end
+ end
+
+ defp rec_callback(store, root, callback) do
+ case root do
+ nil -> nil
+ _ ->
+ %Page{ level: _, low: low, list: lst } = Store.get(store, root)
+ rec_callback(store, low, callback)
+ for {k, v, rst} <- lst do
+ callback.(k, v)
+ rec_callback(store, rst, callback)
+ end
+ end
+ end
+
+ @doc"""
+ Get value for a specific key in search tree.
+ """
+ def get(state, key) do
+ get(state, state.root, key)
+ end
+
+ defp get(s, root, key) do
+ case root do
+ nil -> nil
+ _ ->
+ %Page{ level: _, low: low, list: lst } = Store.get(s.store, root)
+ get_aux(s, low, lst, key)
+ end
+ end
+
+ defp get_aux(s, low, lst, key) do
+ case lst do
+ [] ->
+ get(s, low, key)
+ [ {k, v, low2} | rst ] ->
+ case s.cmp.(key, k) do
+ :duplicate -> v
+ :before ->
+ get(s, low, key)
+ :after ->
+ get_aux(s, low2, rst, key)
+ end
+ end
+ end
+
+ @doc"""
+ Get the last n items of the tree, or the last n items
+ strictly before given upper bound if non nil
+ """
+ def last(state, top_bound, num) do
+ last(state, state.root, top_bound, num)
+ end
+
+ defp last(s, root, top_bound, num) do
+ case root do
+ nil -> []
+ _ ->
+ %Page{ level: _, low: low, list: lst } = Store.get(s.store, root)
+ last_aux(s, low, lst, top_bound, num)
+ end
+ end
+
+ defp last_aux(s, low, lst, top_bound, num) do
+ case lst do
+ [] ->
+ last(s, low, top_bound, num)
+ [ {k, v, low2} | rst ] ->
+ if top_bound == nil or s.cmp.(top_bound, k) == :after do
+ items = last_aux(s, low2, rst, top_bound, num)
+ items = if Enum.count(items) < num do
+ [ {k, v} | items ]
+ else
+ items
+ end
+ cnt = Enum.count items
+ if cnt < num do
+ last(s, low, top_bound, num - cnt) ++ items
+ else
+ items
+ end
+ else
+ last(s, low, top_bound, num)
+ end
+ end
+ end
+
+
+ @doc"""
+ Dump Merkle search tree structure.
+ """
+ def dump(state) do
+ dump(state.store, state.root, "")
+ end
+
+ defp dump(store, root, lvl) do
+ case root do
+ nil ->
+ IO.puts(lvl <> "nil")
+ _ ->
+ %Page{ level: level, low: low, list: lst} = Store.get(store, root)
+ IO.puts(lvl <> "#{root|>Base.encode16} (#{level})")
+ dump(store, low, lvl <> " ")
+ for {k, v, r} <- lst do
+ IO.puts(lvl<>"- #{inspect k} => #{inspect v}")
+ dump(store, r, lvl <> " ")
+ end
+ end
+ end
+
+ defp calc_level(key) do
+ key
+ |> SData.term_hash
+ |> Base.encode16
+ |> String.to_charlist
+ |> count_leading_zeroes
+ end
+
+ defp count_leading_zeroes('0' ++ rest) do
+ 1 + count_leading_zeroes(rest)
+ end
+ defp count_leading_zeroes(_) do
+ 0
+ end
+end
diff --git a/shard/lib/data/store.ex b/shard/lib/data/store.ex
new file mode 100644
index 0000000..ca12cd0
--- /dev/null
+++ b/shard/lib/data/store.ex
@@ -0,0 +1,91 @@
+defprotocol SData.Page do
+ @moduledoc"""
+ Protocol to be implemented by objects that are used as data pages
+ in a pagestore and that may reference other data pages by their hash.
+ """
+
+ @fallback_to_any true
+
+ @doc"""
+ Get hashes of all pages referenced by this page.
+ """
+ def refs(page)
+end
+
+defimpl SData.Page, for: Any do
+ def refs(_page), do: []
+end
+
+
+defprotocol SData.PageStore do
+ @moduledoc"""
+ Protocol to be implemented for page stores to allow their
+ manipulation.
+
+ This protocol may also be implemented by store proxies that track
+ operations and implement different synchronization or caching mechanisms.
+ """
+
+ @doc"""
+ Put a page. Argument is the content of the page, returns the
+ hash that the store has associated to it.
+
+ Returns {hash, store}
+ """
+ def put(store, page)
+
+ @doc"""
+ Get a page referenced by its hash.
+
+ Returns page
+ """
+ def get(store, hash)
+
+ @doc"""
+ Copy to the store a page and all its references from the other store.
+ In the case of pages on the network in a distributed store, this may
+ be lazy.
+
+ Returns store
+ """
+ def copy(store, other_store, hash)
+
+ @doc"""
+ Free a page referenced by its hash, marking it as no longer needed.
+
+ Returns store
+ """
+ def free(store, hash)
+end
+
+
+defmodule SData.LocalStore do
+ defstruct [:pages]
+
+ def new() do
+ %SData.LocalStore{ pages: %{} }
+ end
+end
+
+defimpl SData.PageStore, for: SData.LocalStore do
+ def put(store, page) do
+ hash = SData.term_hash page
+ store = %{ store | pages: Map.put(store.pages, hash, page) }
+ { hash, store }
+ end
+
+ def get(store, hash) do
+ store.pages[hash]
+ end
+
+ def copy(store, other_store, hash) do
+ page = SData.PageStore.get(other_store, hash)
+ refs = SData.Page.refs(page)
+ store = Enum.reduce(refs, store, fn x, acc -> copy(acc, other_store, x) end)
+ %{ store | pages: Map.put(store.pages, hash, page) }
+ end
+
+ def free(store, hash) do
+ %{ store | pages: Map.delete(store.pages, hash) }
+ end
+end
diff --git a/shard/lib/identity.ex b/shard/lib/identity.ex
new file mode 100644
index 0000000..f1899df
--- /dev/null
+++ b/shard/lib/identity.ex
@@ -0,0 +1,46 @@
+defmodule Shard.Identity do
+ use Agent
+ require Salty.Sign.Ed25519, as: Sign
+ require Logger
+
+ def start_link(_) do
+ Agent.start_link(__MODULE__, :init, [], name: __MODULE__)
+ end
+
+ def init() do
+ Logger.info "Generating keypair..."
+ {pk, sk} = gen_keypair(Application.get_env(:shard, :peer_id_suffix))
+ nick_suffix = pk
+ |> binary_part(0, 3)
+ |> Base.encode16
+ |> String.downcase
+ %{
+ keypair: {pk, sk},
+ nickname: "Anon" <> nick_suffix,
+ }
+ end
+
+ defp gen_keypair(suffix, n \\ 0) do
+ {:ok, pk, sk} = Sign.keypair
+ if rem(n, 10000) == 0 do
+ Logger.info "#{n}... expected #{:math.pow(256, byte_size(suffix))}"
+ end
+ if :binary.longest_common_suffix([pk, suffix]) == byte_size(suffix) do
+ {pk, sk}
+ else
+ gen_keypair(suffix, n+1)
+ end
+ end
+
+ def get_keypair() do
+ Agent.get(__MODULE__, &(&1.keypair))
+ end
+
+ def get_nickname() do
+ Agent.get(__MODULE__, &(&1.nickname))
+ end
+
+ def set_nickname(newnick) do
+ Agent.update(__MODULE__, &(%{&1 | nickname: newnick}))
+ end
+end
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
new file mode 100644
index 0000000..82984d6
--- /dev/null
+++ b/shard/lib/manager.ex
@@ -0,0 +1,243 @@
+defmodule Shard.Manager do
+ @moduledoc"""
+ Maintains several important tables :
+
+ - :peer_db
+
+ List of
+ { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }
+
+ - :shard_db
+
+ List of
+ { id, manifest, pid | nil }
+
+ - :shard_procs
+
+ List of
+ { {id, path}, pid }
+
+ - :shard_peer_db
+
+ Mult-list of
+ { shard_id, peer_id }
+
+
+ And an internal table :
+
+ - :outbox
+
+ Multi-list of
+ { peer_id, message }
+
+ """
+
+ use GenServer
+
+ require Logger
+
+ def start_link(my_port) do
+ GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
+ end
+
+ def init(my_port) do
+ :ets.new(:peer_db, [:set, :protected, :named_table])
+ :ets.new(:shard_db, [:set, :protected, :named_table])
+ :ets.new(:shard_procs, [:set, :protected, :named_table])
+ :ets.new(:shard_peer_db, [:bag, :protected, :named_table])
+ outbox = :ets.new(:outbox, [:bag, :private])
+
+ {:ok, %{my_port: my_port, outbox: outbox} }
+ end
+
+ def handle_call({:register, shard_id, manifest, pid}, _from, state) do
+ will_live = case :ets.lookup(:shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] -> not Process.alive?(pid)
+ _ -> true
+ end
+ reply = if will_live do
+ Process.monitor(pid)
+ :ets.insert(:shard_db, {shard_id, manifest, pid})
+ :ok
+ else
+ :redundant
+ end
+ {:reply, reply, state}
+ end
+
+ def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
+ :ets.insert(:shard_procs, { {shard_id, path}, pid })
+ Process.monitor(pid)
+ {:noreply, state}
+ end
+
+ def handle_cast({:interested, peer_id, shards}, state) do
+ for shard_id <- shards do
+ case :ets.lookup(:shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] ->
+ :ets.insert(:shard_peer_db, {shard_id, peer_id})
+ GenServer.cast(pid, {:interested, peer_id})
+ [] -> nil
+ end
+ end
+ {:noreply, state}
+ end
+
+ def handle_cast({:not_interested, peer_id, shard_id}, state) do
+ :ets.match_delete(:shard_peer_db, {shard_id, peer_id})
+ {:noreply, state}
+ end
+
+ def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do
+ :ets.insert(:shard_peer_db, {shard_id, peer_id})
+ {:noreply, state}
+ end
+
+ def handle_cast({:peer_up, pk, pid, ip, port}, state) do
+ for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do
+ if pk2 != pk do
+ # obsolete peer information
+ :ets.delete(:peer_db, pk2)
+ :ets.match_delete(:shard_peer_db, {:_, pk2})
+ end
+ end
+ :ets.insert(:peer_db, {pk, pid, ip, port})
+
+ # Send interested message for all our shards
+ id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id)
+ GenServer.cast(pid, {:send_msg, {:interested, id_list}})
+
+ # Send queued messages
+ for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
+ GenServer.cast(pid, {:send_msg, msg})
+ end
+ :ets.delete(state.outbox, pk)
+
+ {:noreply, state}
+ end
+
+ def handle_cast({:peer_down, pk, ip, port}, state) do
+ :ets.insert(:peer_db, {pk, nil, ip, port})
+ {:noreply, state}
+ end
+
+ def handle_cast({:connect_and_send, peer_id, msg}, state) do
+ case :ets.lookup(:peer_db, peer_id) do
+ [{^peer_id, nil, ip, port}] ->
+ add_peer(ip, port, state)
+ currtime = System.os_time :second
+ :ets.insert(state.outbox, {peer_id, msg, currtime})
+ outbox_cleanup = [{{:_, :_, :'$1'},
+ [{:<, :'$1', currtime - 60}],
+ [:'$1']}]
+ :ets.select_delete(state.outbox, outbox_cleanup)
+ _ ->
+ Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database"
+ end
+ {:noreply, state}
+ end
+
+ def handle_cast({:try_connect, pk_list}, state) do
+ for pk <- pk_list do
+ case :ets.lookup(:peer_db, pk) do
+ [{^pk, nil, ip, port}] ->
+ add_peer(ip, port, state)
+ _ -> nil
+ end
+ end
+ {:noreply, state}
+ end
+
+ def handle_cast({:add_peer, ip, port}, state) do
+ add_peer(ip, port, state)
+ {:noreply, state}
+ end
+
+ def handle_info({:DOWN, _, :process, pid, _}, state) do
+ :ets.match_delete(:shard_procs, {:_, pid})
+ {:noreply, state}
+ end
+
+ defp add_peer(ip, port, state) do
+ spawn fn ->
+ case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
+ {:ok, client} ->
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
+ :ok = :gen_tcp.controlling_process(client, pid)
+ _ ->
+ Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
+ end
+ end
+ end
+
+
+ # ================
+ # PUBLIC INTERFACE
+ # ================
+
+
+ @doc"""
+ Connect to a peer specified by ip address and port
+ """
+ def add_peer(ip, port) do
+ GenServer.cast(__MODULE__, {:add_peer, ip, port})
+ end
+
+ @doc"""
+ Send message to a peer specified by peer id
+ """
+ def send(peer_id, msg) do
+ case :ets.lookup(:peer_db, peer_id) do
+ [{ ^peer_id, pid, _, _}] when pid != nil->
+ GenServer.cast(pid, {:send_msg, msg})
+ _ ->
+ GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
+ end
+ end
+
+ @doc"""
+ Dispatch incoming message to correct shard process
+ """
+ def dispatch(peer_id, {shard_id, path, msg}) do
+ case :ets.lookup(:shard_db, shard_id) do
+ [] ->
+ __MODULE__.send(peer_id, {:not_interested, shard_id})
+ [_] ->
+ case :ets.match(:shard_peer_db, {shard_id, peer_id}) do
+ [] ->
+ GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id})
+ _ -> nil
+ end
+ case :ets.lookup(:shard_procs, {shard_id, path}) do
+ [{ {^shard_id, ^path}, pid }] ->
+ GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg})
+ [] ->
+ Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}")
+ end
+ end
+ end
+
+ def dispatch(peer_id, {:interested, shards}) do
+ GenServer.cast(__MODULE__, {:interested, peer_id, shards})
+ end
+
+ def dispatch(peer_id, {:not_interested, shard}) do
+ GenServer.cast(__MODULE__, {:not_interested, peer_id, shard})
+ end
+
+ @doc"""
+ Register a process as the main process for a shard.
+
+ Returns either :ok or :redundant, in which case the process must exit.
+ """
+ def register(shard_id, manifest, pid) do
+ GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
+ end
+
+ @doc"""
+ Register a process as the handler for shard packets for a given path.
+ """
+ def dispatch_to(shard_id, path, pid) do
+ GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
+ end
+end
diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex
new file mode 100644
index 0000000..44669bb
--- /dev/null
+++ b/shard/lib/net/tcpconn.ex
@@ -0,0 +1,106 @@
+defmodule SNet.TCPConn do
+ use GenServer, restart: :temporary
+ require Salty.Box.Curve25519xchacha20poly1305, as: Box
+ require Salty.Sign.Ed25519, as: Sign
+ require Logger
+
+ def start_link(state) do
+ GenServer.start_link(__MODULE__, state)
+ end
+
+ def init(state) do
+ GenServer.cast(self(), :handshake)
+ {:ok, state}
+ end
+
+ def handle_call(:get_host_str, _from, state) do
+ {:reply, "#{state.his_pkey|>Base.encode16|>String.downcase}@#{to_string(:inet_parse.ntoa(state.addr))}:#{state.port}", state}
+ end
+
+ def handle_cast(:handshake, state) do
+ socket = state.socket
+
+ {srv_pkey, srv_skey} = Shard.Identity.get_keypair
+ {:ok, sess_pkey, sess_skey} = Box.keypair
+ {:ok, challenge} = Salty.Random.buf 32
+
+ # Exchange public keys and challenge
+ hello = {srv_pkey, sess_pkey, challenge, state.my_port}
+ :gen_tcp.send(socket, :erlang.term_to_binary hello)
+ {:ok, pkt} = :gen_tcp.recv(socket, 0)
+ {cli_pkey, cli_sess_pkey, cli_challenge, his_port} = :erlang.binary_to_term(pkt, [:safe])
+
+ # Do challenge and check their challenge
+ {:ok, cli_challenge_sign} = Sign.sign_detached(cli_challenge, srv_skey)
+ pkt = encode_pkt(cli_challenge_sign, cli_sess_pkey, sess_skey)
+ :gen_tcp.send(socket, pkt)
+
+ {:ok, pkt} = :gen_tcp.recv(socket, 0)
+ challenge_sign = decode_pkt(pkt, cli_sess_pkey, sess_skey)
+ :ok = Sign.verify_detached(challenge_sign, challenge, cli_pkey)
+
+ expected_suffix = Application.get_env(:shard, :peer_id_suffix)
+ len = byte_size(expected_suffix)
+ ^len = :binary.longest_common_suffix([cli_pkey, expected_suffix])
+
+ # Connected
+ :inet.setopts(socket, [active: true])
+
+ {:ok, {addr, port}} = :inet.peername socket
+ state =%{ socket: socket,
+ my_pkey: srv_pkey,
+ my_skey: srv_skey,
+ his_pkey: cli_pkey,
+ conn_my_pkey: sess_pkey,
+ conn_my_skey: sess_skey,
+ conn_his_pkey: cli_sess_pkey,
+ addr: addr,
+ port: port,
+ his_port: his_port
+ }
+ GenServer.cast(Shard.Manager, {:peer_up, cli_pkey, self(), addr, his_port})
+ Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
+
+ {:noreply, state}
+ end
+
+ def handle_cast({:send_msg, msg}, state) do
+ msgbin = :erlang.term_to_binary msg
+ enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey)
+ :gen_tcp.send(state.socket, enc)
+ {:noreply, state}
+ end
+
+ defp encode_pkt(pkt, pk, sk) do
+ {:ok, n} = Salty.Random.buf Box.noncebytes
+ {:ok, msg} = Box.easy(pkt, n, pk, sk)
+ n <> msg
+ end
+
+ defp decode_pkt(pkt, pk, sk) do
+ n = binary_part(pkt, 0, Box.noncebytes)
+ enc = binary_part(pkt, Box.noncebytes, (byte_size pkt) - Box.noncebytes)
+ {:ok, msg} = Box.open_easy(enc, n, pk, sk)
+ msg
+ end
+
+ def handle_info({:tcp, _socket, raw_data}, state) do
+ msg = decode_pkt(raw_data, state.conn_his_pkey, state.conn_my_skey)
+ msg_data = :erlang.binary_to_term(msg, [:safe])
+ Shard.Manager.dispatch(state.his_pkey, msg_data)
+ {:noreply, state}
+ end
+
+ def handle_info({:tcp_closed, _socket}, state) do
+ Logger.info "Disconnected: #{print_id state} at #{inspect state.addr}:#{state.port}"
+ GenServer.cast(Shard.Manager, {:peer_down, state.his_pkey, state.addr, state.his_port})
+ exit(:normal)
+ end
+
+ defp print_id(state) do
+ state.his_pkey
+ |> binary_part(0, 8)
+ |> Base.encode16
+ |> String.downcase
+ end
+end
diff --git a/shard/lib/net/tcpserver.ex b/shard/lib/net/tcpserver.ex
new file mode 100644
index 0000000..46552a4
--- /dev/null
+++ b/shard/lib/net/tcpserver.ex
@@ -0,0 +1,26 @@
+defmodule SNet.TCPServer do
+ require Logger
+ use Task, restart: :permanent
+
+ def start_link(port) do
+ Task.start_link(__MODULE__, :accept, [port])
+ end
+
+ @doc """
+ Starts accepting connections on the given `port`.
+ """
+ def accept(port) do
+ {:ok, socket} = :gen_tcp.listen(port,
+ [:binary, packet: 2, active: false, reuseaddr: true])
+ Logger.info "Accepting connections on port #{port}"
+ loop_acceptor(socket, port)
+ end
+
+ defp loop_acceptor(socket, my_port) do
+ {:ok, client} = :gen_tcp.accept(socket)
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
+ :ok = :gen_tcp.controlling_process(client, pid)
+ loop_acceptor(socket, my_port)
+ end
+end
+
diff --git a/shard/mix.exs b/shard/mix.exs
new file mode 100644
index 0000000..7192feb
--- /dev/null
+++ b/shard/mix.exs
@@ -0,0 +1,33 @@
+defmodule Shard.MixProject do
+ use Mix.Project
+
+ def project do
+ [
+ app: :shard,
+ version: "0.1.0",
+ elixir: "~> 1.6",
+ build_embedded: Mix.env == :prod,
+ start_permanent: Mix.env() == :prod,
+ deps: deps(),
+ test_coverage: [tool: ExCoveralls],
+ preferred_cli_env: [coveralls: :test, "coveralls.detail": :test, "coveralls.post": :test, "coveralls.html": :test]
+ ]
+ end
+
+ # Run "mix help compile.app" to learn about applications.
+ def application do
+ [
+ extra_applications: [:logger],
+ mod: {Shard.Application, []}
+ ]
+ end
+
+ # Run "mix help deps" to learn about dependencies.
+ defp deps do
+ [
+ {:excoveralls, "~> 0.10", only: :test},
+
+ {:salty, "~> 0.1.3", hex: :libsalty},
+ ]
+ end
+end
diff --git a/shard/mix.lock b/shard/mix.lock
new file mode 100644
index 0000000..4f92013
--- /dev/null
+++ b/shard/mix.lock
@@ -0,0 +1,20 @@
+%{
+ "certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"},
+ "cowboy": {:hex, :cowboy, "1.1.2", "61ac29ea970389a88eca5a65601460162d370a70018afe6f949a29dca91f3bb0", [:rebar3], [{:cowlib, "~> 1.0.2", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3.2", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"},
+ "cowlib": {:hex, :cowlib, "1.0.2", "9d769a1d062c9c3ac753096f868ca121e2730b9a377de23dec0f7e08b1df84ee", [:make], [], "hexpm"},
+ "elixir_make": {:hex, :elixir_make, "0.4.2", "332c649d08c18bc1ecc73b1befc68c647136de4f340b548844efc796405743bf", [:mix], [], "hexpm"},
+ "ex2ms": {:hex, :ex2ms, "1.5.0", "19e27f9212be9a96093fed8cdfbef0a2b56c21237196d26760f11dfcfae58e97", [:mix], [], "hexpm"},
+ "excoveralls": {:hex, :excoveralls, "0.10.0", "a4508bdd408829f38e7b2519f234b7fd5c83846099cda348efcb5291b081200c", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
+ "hackney": {:hex, :hackney, "1.13.0", "24edc8cd2b28e1c652593833862435c80661834f6c9344e84b6a2255e7aeef03", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
+ "idna": {:hex, :idna, "5.1.2", "e21cb58a09f0228a9e0b95eaa1217f1bcfc31a1aaa6e1fdf2f53a33f7dbd9494", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
+ "jason": {:hex, :jason, "1.1.1", "d3ccb840dfb06f2f90a6d335b536dd074db748b3e7f5b11ab61d239506585eb2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
+ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
+ "mime": {:hex, :mime, "1.3.0", "5e8d45a39e95c650900d03f897fbf99ae04f60ab1daa4a34c7a20a5151b7a5fe", [:mix], [], "hexpm"},
+ "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"},
+ "parse_trans": {:hex, :parse_trans, "3.2.0", "2adfa4daf80c14dc36f522cf190eb5c4ee3e28008fc6394397c16f62a26258c2", [:rebar3], [], "hexpm"},
+ "plug": {:hex, :plug, "1.3.6", "bcdf94ac0f4bc3b804bdbdbde37ebf598bd7ed2bfa5106ed1ab5984a09b7e75f", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1", [hex: :cowboy, repo: "hexpm", optional: true]}, {:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}], "hexpm"},
+ "ranch": {:hex, :ranch, "1.3.2", "e4965a144dc9fbe70e5c077c65e73c57165416a901bd02ea899cfd95aa890986", [:rebar3], [], "hexpm"},
+ "salty": {:hex, :libsalty, "0.1.3", "13332eb13ac995f5deb76903b44f96f740e1e3a6e511222bffdd8b42cd079ffb", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm"},
+ "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"},
+ "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"},
+}
diff --git a/shard/test/conn_test.exs b/shard/test/conn_test.exs
new file mode 100644
index 0000000..275f6dd
--- /dev/null
+++ b/shard/test/conn_test.exs
@@ -0,0 +1,62 @@
+defmodule ShardTest.Conn do
+ use ExUnit.Case
+ doctest Shard.Application
+
+ require Salty.Box.Curve25519xchacha20poly1305, as: Box
+ require Salty.Sign.Ed25519, as: Sign
+
+ test "crypto connection" do
+ {:ok, srv_pkey, srv_skey} = Sign.keypair
+ {:ok, sess_pkey, sess_skey} = Box.keypair
+ {:ok, challenge} = Salty.Random.buf 32
+ {:ok, socket} = :gen_tcp.connect {127,0,0,1}, 4044, [:binary, packet: 2, active: false]
+
+ hello = {srv_pkey, sess_pkey, challenge, 0}
+ :gen_tcp.send(socket, :erlang.term_to_binary hello)
+ {:ok, pkt} = :gen_tcp.recv(socket, 0)
+ {cli_pkey, cli_sess_pkey, cli_challenge, _his_port} = :erlang.binary_to_term(pkt, [:safe])
+
+ {:ok, cli_challenge_sign} = Sign.sign_detached(cli_challenge, srv_skey)
+ sendmsg(socket, cli_challenge_sign, cli_sess_pkey, sess_skey)
+
+ challenge_sign = recvmsg(socket, cli_sess_pkey, sess_skey)
+ :ok = Sign.verify_detached(challenge_sign, challenge, cli_pkey)
+
+ pkt = :erlang.binary_to_term(recvmsg(socket, cli_sess_pkey, sess_skey), [:safe])
+ IO.puts (inspect pkt)
+ end
+
+ defp sendmsg(sock, msg, pk, sk) do
+ {:ok, n} = Salty.Random.buf Box.noncebytes
+ {:ok, msg} = Box.easy(msg, n, pk, sk)
+ :gen_tcp.send(sock, n <> msg)
+ end
+
+ defp recvmsg(sock, pk, sk) do
+ {:ok, pkt} = :gen_tcp.recv(sock, 0)
+ n = binary_part(pkt, 0, Box.noncebytes)
+ enc = binary_part(pkt, Box.noncebytes, (byte_size pkt) - Box.noncebytes)
+ {:ok, msg} = Box.open_easy(enc, n, pk, sk)
+ msg
+ end
+
+
+ test "set nickname" do
+ Shard.Identity.set_nickname "test bot"
+ end
+
+ test "connect to other instance" do
+ Shard.Manager.add_peer({127, 0, 0, 1}, 4045)
+ receive do after 100 -> nil end
+ end
+
+ test "connect to chat rooms" do
+ {:ok, pid1} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"})
+ {:ok, pid2} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "other_test"})
+ GenServer.cast(pid1, {:chat_send, "test msg 1"})
+ GenServer.cast(pid2, {:chat_send, "test msg 2"})
+
+ {:error, :redundant} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"})
+ end
+
+end
diff --git a/shard/test/mkllst_test.exs b/shard/test/mkllst_test.exs
new file mode 100644
index 0000000..68dacf3
--- /dev/null
+++ b/shard/test/mkllst_test.exs
@@ -0,0 +1,18 @@
+defmodule ShardTest.MklLst do
+ use ExUnit.Case
+ doctest Shard.Application
+
+ test "merkle list" do
+ alias SData.MerkleList, as: ML
+
+ mkl = ML.new(&SData.cmp_ts_str/2)
+
+ {:ok, [], nil} = ML.read(mkl)
+
+ mkl = ML.insert(mkl, {12, "aa, bb"})
+ mkl = ML.insert_many(mkl, [{14, "qwerty"}, {8, "haha"}])
+ mkl = ML.insert(mkl, {14, "qwerty"})
+ {:ok, list, nil} = ML.read(mkl, nil, nil)
+ assert length(list) == 3
+ end
+end
diff --git a/shard/test/mst_test.exs b/shard/test/mst_test.exs
new file mode 100644
index 0000000..c1758ad
--- /dev/null
+++ b/shard/test/mst_test.exs
@@ -0,0 +1,197 @@
+defmodule ShardTest.MST do
+ use ExUnit.Case
+ alias SData.MerkleSearchTree, as: MST
+ doctest Shard.Application
+
+ test "merkle search tree 1" do
+ y = Enum.reduce(0..1000, %MST{},
+ fn i, acc -> MST.insert(acc, i) end)
+
+
+ z = Enum.reduce(Enum.shuffle(0..1000), %MST{},
+ fn i, acc -> MST.insert(acc, i) end)
+
+ for i <- 0..1000 do
+ assert MST.get(y, i) == true
+ assert MST.get(z, i) == true
+ end
+ assert MST.get(y, 9999) == nil
+ assert MST.get(z, -1001) == nil
+ assert MST.get(z, 1.01) == nil
+
+ IO.puts "y.root: #{y.root|>Base.encode16}"
+ IO.puts "z.root: #{z.root|>Base.encode16}"
+ assert y.root == z.root
+ end
+
+ test "merkle search tree 2" do
+ items = for i <- 0..1000 do
+ h = SData.term_hash i
+ {h, SData.term_hash h}
+ end
+
+ merge = fn a, b -> if a > b do a else b end end
+
+ y = Enum.reduce(items, %MST{merge: merge},
+ fn {k, v}, acc -> MST.insert(acc, k, v) end)
+
+ z = Enum.reduce(Enum.shuffle(items), %MST{merge: merge},
+ fn {k, v}, acc -> MST.insert(acc, k, v) end)
+
+ for {k, v} <- items do
+ assert MST.get(y, k) == v
+ assert MST.get(z, k) == v
+ end
+
+ IO.puts "y.root: #{y.root|>Base.encode16}"
+ IO.puts "z.root: #{z.root|>Base.encode16}"
+ assert y.root == z.root
+ end
+
+ test "merkle search tree 3" do
+ merge = fn a, b -> if a > b do a else b end end
+
+ y = Enum.reduce(0..1000, %MST{merge: merge},
+ fn i, acc -> MST.insert(acc, i, i) end)
+ y = Enum.reduce(0..1000, y,
+ fn i, acc -> MST.insert(acc, i, i + 2 * rem(i, 2) - 1) end)
+
+
+ z = Enum.reduce(Enum.shuffle(0..1000), %MST{merge: merge},
+ fn i, acc -> MST.insert(acc, i, i) end)
+ z = Enum.reduce(Enum.shuffle(0..1000), z,
+ fn i, acc -> MST.insert(acc, i, i + 2 * rem(i, 2) - 1) end)
+
+ for i <- 0..1000 do
+ val = if rem(i, 2) == 1 do i+1 else i end
+ assert MST.get(y, i) == val
+ assert MST.get(z, i) == val
+ end
+ assert MST.get(y, 9999) == nil
+ assert MST.get(z, -1001) == nil
+ assert MST.get(z, 1.01) == nil
+
+ IO.puts "y.root: #{y.root|>Base.encode16}"
+ IO.puts "z.root: #{z.root|>Base.encode16}"
+ assert y.root == z.root
+ end
+
+ test "merkle search tree 4" do
+ items = for i <- 0..1000 do
+ h = SData.term_hash i
+ {h, SData.term_hash h}
+ end
+
+ cmp = fn {a1, b1}, {a2, b2} ->
+ cond do
+ a1 < a2 -> :before
+ a1 > a2 -> :after
+ b1 > b2 -> :before
+ b1 < b2 -> :after
+ true -> :duplicate
+ end
+ end
+
+ y = Enum.reduce(items, %MST{cmp: cmp},
+ fn {a, b}, acc -> MST.insert(acc, {a, b}) end)
+
+ z = Enum.reduce(Enum.shuffle(items), %MST{cmp: cmp},
+ fn {a, b}, acc -> MST.insert(acc, {a, b}) end)
+
+ for {k, v} <- items do
+ assert MST.get(y, {k, v}) == true
+ assert MST.get(z, {k, v}) == true
+ end
+ assert MST.get(z, {"foo", "bar"}) == nil
+
+ IO.puts "y.root: #{y.root|>Base.encode16}"
+ IO.puts "z.root: #{z.root|>Base.encode16}"
+ assert y.root == z.root
+
+ MST.last(y, nil, 10)
+ end
+
+ test "merkle search tree 5: MST.last" do
+ y = Enum.reduce(0..1000, %MST{},
+ fn i, acc -> MST.insert(acc, i) end)
+
+ assert(MST.last(y, nil, 2) == [{999, true}, {1000, true}])
+ assert(MST.last(y, 42, 2) == [{40, true}, {41, true}])
+
+ stuff = for i <- 100..199, do: {i, true}
+ assert MST.last(y, 200, 100) == stuff
+
+ stuff = for i <- 200..299, do: {i, true}
+ assert MST.last(y, 300, 100) == stuff
+
+ stuff = for i <- 200..499, do: {i, true}
+ assert MST.last(y, 500, 300) == stuff
+ end
+
+ test "merkle search tree 6: MST.merge" do
+ y = Enum.reduce([1, 2, 42], %MST{}, fn i, acc -> MST.insert(acc, i) end)
+ z = Enum.reduce([2, 12], %MST{}, fn i, acc -> MST.insert(acc, i) end)
+
+ IO.puts "y: "
+ MST.dump y
+ IO.puts "z: "
+ MST.dump z
+
+ mg1 = MST.merge(y, z)
+ IO.puts "mg1: "
+ MST.dump mg1
+
+ mg2 = MST.merge(y, z)
+ IO.puts "mg2: "
+ MST.dump mg2
+ assert mg1.root == mg2.root
+ end
+
+ test "merkle search tree 7: MST.merge" do
+ items1 = (for i <- 1..1000, do: i*2+40)
+ items2 = (for i <- 1..1000, do: i*3)
+
+ y = Enum.reduce(items1, %MST{}, fn i, acc -> MST.insert(acc, i) end)
+ z = Enum.reduce(items2, %MST{}, fn i, acc -> MST.insert(acc, i) end)
+
+ IO.puts "(merge) y.root: #{y.root|>Base.encode16}"
+ IO.puts "(merge) z.root: #{z.root|>Base.encode16}"
+
+ mg1 = MST.merge(y, z)
+ IO.puts "(merge) mg1.root: #{mg1.root|>Base.encode16}"
+
+ mg2 = MST.merge(y, z)
+ IO.puts "(merge) mg2.root: #{mg2.root|>Base.encode16}"
+
+ assert mg1.root == mg2.root
+
+ items1t = (for i <- items1, do: {i, true})
+ items2t = (for i <- items2, do: {i, true})
+ assert MST.last(y, nil, 1000) == items1t
+ assert MST.last(z, nil, 1000) == items2t
+
+ all_items = (items1t ++ items2t) |> Enum.sort |> Enum.uniq
+ assert MST.last(mg1, nil, 2000) == all_items
+ end
+
+ test "merkle search tree 8: MST.merge callback" do
+ items1 = (for i <- 1..1000, do: i*2+40)
+ items2 = (for i <- 1..1000, do: i*3)
+
+ y = Enum.reduce(items1, %MST{}, fn i, acc -> MST.insert(acc, i) end)
+ z = Enum.reduce(items2, %MST{}, fn i, acc -> MST.insert(acc, i) end)
+
+ {:ok, cb_called} = Agent.start_link fn -> [] end
+
+ cb = fn i, true -> Agent.update(cb_called, fn x -> [i | x] end) end
+ mg = MST.merge(y, z, cb)
+
+ cb_vals = Agent.get cb_called, &(&1)
+ expected = MapSet.difference(MapSet.new(items2), MapSet.new(items1))
+ |> MapSet.to_list
+ |> Enum.sort
+ |> Enum.reverse
+ assert expected == cb_vals
+ end
+
+end
diff --git a/shard/test/test_helper.exs b/shard/test/test_helper.exs
new file mode 100644
index 0000000..e5b6600
--- /dev/null
+++ b/shard/test/test_helper.exs
@@ -0,0 +1,8 @@
+ExUnit.start()
+
+case :gen_tcp.connect('localhost', 4045, []) do
+ {:ok, socket} ->
+ :gen_tcp.close(socket)
+ {:error, _reason} ->
+ Mix.raise "Please have another instance of Shard running at 127.0.0.1:4045, it can be launched with the command: PORT=4045 iex -S mix"
+end