diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/app/blockstore.ex | 149 | ||||
-rw-r--r-- | lib/app/chat.ex | 208 | ||||
-rw-r--r-- | lib/application.ex | 33 | ||||
-rw-r--r-- | lib/cli/cli.ex | 98 | ||||
-rw-r--r-- | lib/data/data.ex | 49 | ||||
-rw-r--r-- | lib/data/merklelist.ex | 143 | ||||
-rw-r--r-- | lib/data/merklesearchtree.ex | 387 | ||||
-rw-r--r-- | lib/data/store.ex | 91 | ||||
-rw-r--r-- | lib/identity.ex | 46 | ||||
-rw-r--r-- | lib/manager.ex | 243 | ||||
-rw-r--r-- | lib/net/tcpconn.ex | 106 | ||||
-rw-r--r-- | lib/net/tcpserver.ex | 26 |
12 files changed, 0 insertions, 1579 deletions
diff --git a/lib/app/blockstore.ex b/lib/app/blockstore.ex deleted file mode 100644 index 8e4fddc..0000000 --- a/lib/app/blockstore.ex +++ /dev/null @@ -1,149 +0,0 @@ -defmodule SApp.BlockStore do - @moduledoc """ - A module that implements a content-adressable storage (blocks, or pages, - identified by the hash of their contents). - - This is not a shard, it is a side process that a shard may use to store its data. - - TODO: WIP - """ - - use GenServer - - @enforce_keys [:pid] - defstruct [:pid, :prefer_ask] - - - defmodule State do - defstruct [:shard_id, :path, :store, :reqs, :retries] - end - - - def start_link(shard_id, path) do - GenServer.start_link(__MODULE__, [shard_id, path]) - end - - def init([shard_id, path]) do - Shard.Manager.dispatch_to(shard_id, path, self()) - - {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}} - end - - def handle_call({:get, key, prefer_ask}, from, state) do - case state.store[key] do - nil -> - case prefer_ask do - [_ | _] -> - for peer <- prefer_ask do - Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) - end - _ -> - ask_random_peers(state, key) - end - reqs_key = case state.reqs[key] do - nil -> - MapSet.put(MapSet.new(), from) - ms -> - MapSet.put(ms, from) - end - state = put_in(state.reqs[key], reqs_key) - {:noreply, state} - v -> - {:reply, v, state} - end - end - - def handle_call({:put, val}, _from, state) do - hash = SData.term_hash val - state = %{state | store: Map.put(state.store, hash, val)} - {:reply, hash, state} - end - - def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do - state = case msg do - {:get, key} -> - case state.store[key] do - nil -> - Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) - v -> - Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}}) - end - state - {:info, hash, value} -> - if SData.term_hash value == hash do - reqs = case state.reqs[hash] do - nil -> state.reqs - pids -> - for pid <- pids do - GenServer.reply(pid, value) - end - Map.delete(state.reqs, hash) - end - state = %{state | retries: Map.delete(state.retries, hash)} - %{state | store: Map.put(state.store, hash, value), reqs: reqs} - else - state - end - {:not_found, key} -> - if state.reqs[key] != nil and state.store[key] == nil do - nretry = case state.retries[key] do - nil -> 1 - n -> n+1 - end - if nretry < 3 do - ask_random_peers(state, key) - %{state | retries: Map.put(state.retries, key, nretry)} - else - for pid <- state.reqs[key] do - GenServer.reply(pid, nil) - end - state = %{state | reqs: Map.delete(state.reqs, key)} - state = %{state | retries: Map.delete(state.retries, key)} - state - end - else - state - end - end - {:noreply, state} - end - - def ask_random_peers(state, key) do - peers = :ets.lookup(:shard_peer_db, state.shard_id) - |> Enum.shuffle - |> Enum.take(3) - for {_, peer} <- peers do - Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) - end - end - - - defimpl SData.PageStore do - def put(store, page) do - hash = GenServer.call(store.pid, {:put, page}) - { hash, store } - end - - def get(store, hash) do - try do - GenServer.call(store.pid, {:get, hash, store.prefer_ask}) - catch - :exit, {:timeout, _} -> nil - end - end - - def copy(store, other_store, hash) do - page = SData.PageStore.get(other_store, hash) - refs = SData.Page.refs(page) - for ref <- refs do - copy(store, other_store, ref) - end - GenServer.call(store.pid, {:put, page}) - store - end - - def free(store, _hash) do - store ## DO SOMETHING??? - end - end -end diff --git a/lib/app/chat.ex b/lib/app/chat.ex deleted file mode 100644 index 051fab6..0000000 --- a/lib/app/chat.ex +++ /dev/null @@ -1,208 +0,0 @@ -defmodule SApp.Chat do - @moduledoc """ - Shard application for a replicated chat room with full history. - - Chat rooms are globally identified by their channel name. - A chat room manifest is of the form: - - {:chat, channel_name} - - Future improvements: - - message signing - - storage of the chatroom messages to disk - - storage of the known peers that have this channel to disk - - use a DHT to find peers that are interested in this channel - - epidemic broadcast (carefull not to be too costly, - maybe by limiting the number of peers we talk to) - - partial synchronization only == data distributed over peers - """ - - use GenServer - - require Logger - alias SData.MerkleSearchTree, as: MST - - @doc """ - Start a process that connects to a given channel - """ - def start_link(channel) do - GenServer.start_link(__MODULE__, channel) - end - - @doc """ - Initialize channel process. - """ - def init(channel) do - manifest = {:chat, channel} - id = SData.term_hash manifest - - case Shard.Manager.register(id, manifest, self()) do - :ok -> - Shard.Manager.dispatch_to(id, nil, self()) - {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store) - mst = %MST{store: %SApp.BlockStore{pid: block_store}, - cmp: &msg_cmp/2} - GenServer.cast(self(), :init_pull) - {:ok, - %{channel: channel, - id: id, - manifest: manifest, - block_store: block_store, - mst: mst, - subs: MapSet.new, - } - } - :redundant -> - exit(:redundant) - end - end - - @doc """ - Implementation of the :manifest call that returns the chat room's manifest - """ - def handle_call(:manifest, _from, state) do - {:reply, state.manifest, state} - end - - def handle_call({:read_history, top_bound, num}, _from, state) do - ret = MST.last(state.mst, top_bound, num) - {:reply, ret, state} - end - - @doc """ - Implementation of the :init_pull handler, which is called when the - process starts. It contacts all currently connected peers and asks them to - send data for this channel if they have some. - """ - def handle_cast(:init_pull, state) do - for {_, pid, _, _} <- :ets.tab2list(:peer_db) do - GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) - end - {:noreply, state} - end - - @doc """ - Implementation of the :chat_send handler. This is the main handler that is used - to send a message to the chat room. Puts the message in the store and syncs - with all connected peers. - """ - def handle_cast({:chat_send, msg}, state) do - msgitem = {(System.os_time :seconds), - Shard.Identity.get_nickname(), - msg} - prev_root = state.mst.root - mst = MST.insert(state.mst, msgitem) - state = %{state | mst: mst} - - for pid <- state.subs do - if Process.alive?(pid) do - send(pid, {:chat_send, state.channel, msgitem}) - end - end - - notif = {:append, prev_root, msgitem, mst.root} - for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do - Shard.Manager.send(peer_id, {state.id, nil, notif}) - end - - {:noreply, state} - end - - @doc """ - Implementation of the :interested handler, this is called when a peer we are - connected to asks to recieve data for this channel. - """ - def handle_cast({:interested, peer_id}, state) do - Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}}) - {:noreply, state} - end - - def handle_cast({:subscribe, pid}, state) do - Process.monitor(pid) - new_subs = MapSet.put(state.subs, pid) - {:noreply, %{ state | subs: new_subs }} - end - - @doc """ - Implementation of the :msg handler, which is the main handler for messages - comming from other peers concerning this chat room. - - Messages are: - - `{:get, start}`: get some messages starting at a given Merkle hash - - `{:info, start, list, rest}`: put some messages and informs of the - Merkle hash of the store of older messages. - """ - def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do - state = case msg do - {:get_manifest} -> - Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}}) - state - {:append, prev_root, msgitem, new_root} -> - # Append message: one single mesage has arrived - if new_root == state.mst.root do - # We already have the message, do nothing - state - else - # Try adding the message - if prev_root == state.mst.root do - mst2 = MST.insert(state.mst, msgitem) - if mst2.root == new_root do - # This was the only message missing, we are happy! - state = %{state | mst: mst2} - msg_callback(state, msgitem) - state - else - # More messages are missing, start a full merge - init_merge(state, new_root, peer_id) - end - else - init_merge(state, new_root, peer_id) - end - end - {:root, new_root} -> - if new_root == state.mst.root do - # already up to date, ignore - state - else - init_merge(state, new_root, peer_id) - end - x -> - Logger.info("Unhandled message: #{inspect x}") - state - end - {:noreply, state} - end - - defp init_merge(state, new_root, source_peer) do - # TODO: make the merge asynchronous - mgmst = %{state.mst | root: new_root} - mgmst = put_in(mgmst.store.prefer_ask, [source_peer]) - mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end) - %{state | mst: mst} - end - - def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do - new_subs = MapSet.delete(state.subs, pid) - {:noreply, %{ state | subs: new_subs }} - end - - defp msg_callback(state, {ts, nick, msg}) do - for pid <- state.subs do - if Process.alive?(pid) do - send(pid, {:chat_recv, state.channel, {ts, nick, msg}}) - end - end - end - - defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do - cond do - ts1 > ts2 -> :after - ts1 < ts2 -> :before - nick1 > nick2 -> :after - nick1 < nick2 -> :before - msg1 > msg2 -> :after - msg1 < msg2 -> :before - true -> :duplicate - end - end -end diff --git a/lib/application.ex b/lib/application.ex deleted file mode 100644 index 3e3a6ac..0000000 --- a/lib/application.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Shard.Application do - @moduledoc """ - Main Shard application. - - Shard is a prototype peer-to-peer comunication platform with data - synchronization. - """ - - use Application - - def start(_type, _args) do - import Supervisor.Spec, warn: false - - {listen_port, _} = Integer.parse ((System.get_env "PORT") || "4044") - - # Define workers and child supervisors to be supervised - children = [ - Shard.Identity, - { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor }, - - # Networking - { SNet.TCPServer, listen_port }, - - # Applications & data store - { Shard.Manager, listen_port }, - ] - - # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html - # for other strategies and supported options - opts = [strategy: :one_for_one, name: Shard.Supervisor] - Supervisor.start_link(children, opts) - end -end diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex deleted file mode 100644 index 2fbf8c2..0000000 --- a/lib/cli/cli.ex +++ /dev/null @@ -1,98 +0,0 @@ -defmodule SCLI do - @moduledoc """ - Small command line interface for the chat application - """ - - def run() do - run(nil) - end - - defp run(pid) do - handle_messages() - - nick = Shard.Identity.get_nickname - prompt = case pid do - nil -> "(no channel) #{nick}: " - _ -> - {:chat, chan} = GenServer.call(pid, :manifest) - "##{chan} #{nick}: " - end - - str = prompt |> IO.gets |> String.trim - cond do - str == "/quit" -> - nil - String.slice(str, 0..0) == "/" -> - command = str |> String.slice(1..-1) |> String.split(" ") - pid2 = handle_command(pid, command) - run(pid2) - true -> - if str != "" do - GenServer.cast(pid, {:chat_send, str}) - end - run(pid) - end - end - - defp handle_messages() do - receive do - {:chat_recv, chan, {ts, nick, msg}} -> - IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}" - handle_messages() - {:chat_send, _, _} -> - # do nothing - handle_messages() - after 10 -> nil - end - end - - defp handle_command(pid, ["connect", ipstr, portstr]) do - {:ok, ip} = :inet.parse_address (to_charlist ipstr) - {port, _} = Integer.parse portstr - Shard.Manager.add_peer(ip, port) - pid - end - - defp handle_command(pid, ["list"]) do - IO.puts "List of known channels:" - - for {_chid, manifest, _chpid} <- :ets.tab2list(:shard_db) do - {:chat, chan} = manifest - IO.puts "##{chan}" - end - pid - end - - defp handle_command(pid, ["hist"]) do - GenServer.call(pid, {:read_history, nil, 25}) - |> Enum.each(fn {{ts, nick, msg}, true} -> - IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" - end) - pid - end - - defp handle_command(_pid, ["join", qchan]) do - list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db), - {:chat, chan} = manifest, - do: {chan, chpid} - case List.keyfind(list, qchan, 0) do - nil -> - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan}) - GenServer.cast(pid, {:subscribe, self()}) - pid - {_, pid} -> - IO.puts "Switching to ##{qchan}" - pid - end - end - - defp handle_command(pid, ["nick", nick]) do - Shard.Identity.set_nickname nick - pid - end - - defp handle_command(pid, _cmd) do - IO.puts "Invalid command" - pid - end -end diff --git a/lib/data/data.ex b/lib/data/data.ex deleted file mode 100644 index c2c659d..0000000 --- a/lib/data/data.ex +++ /dev/null @@ -1,49 +0,0 @@ -defmodule SData do - @moduledoc """ - Utility functions - - Compare functions are functions that compares stored items and provides a total order. - They must return: - - `:after` if the first argument is more recent - - `:duplicate` if the two items are the same - - `:before` if the first argument is older - These functions must only return :duplicate for equal items. - """ - - @doc """ - Calculate the hash of an Erlang term by first converting it to its - binary representation. - """ - def term_hash(term, algo \\ :sha256) do - :crypto.hash(algo, (:erlang.term_to_binary term)) - end - - @doc""" - Compare function for arbitrary terms using the Erlang order - """ - def cmp_term(a, b) do - cond do - a > b -> :after - a < b -> :before - a == b -> :duplicate - end - end - - @doc""" - Compare function for timestamped strings - """ - def cmp_ts_str({ts1, str1}, {ts2, str2}) do - cond do - ts1 > ts2 -> :after - ts1 < ts2 -> :before - str1 > str2 -> :after - str1 < str2 -> :before - true -> :duplicate - end - end - - @doc""" - Merge function for nils - """ - def merge_true(true, true), do: true -end diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex deleted file mode 100644 index 9b44ee8..0000000 --- a/lib/data/merklelist.ex +++ /dev/null @@ -1,143 +0,0 @@ -defmodule SData.MerkleList do - @moduledoc""" - A simple Merkle list store. - - Future improvements: - - When messages are inserted other than at the top, all intermediate hashes - change. Keep track of the mapping from old hashes to new hashes so that get - requests can work even for hashes that are not valid anymore. - - group items in "pages" (bigger bundles) - """ - - defstruct [:root, :top, :cmp, :store] - - @doc""" - Create a Merkle list store. - - `cmp` is a compare function that respects the interface defined in module `SData`. - """ - def new(cmp) do - root_item = :root - root_hash = SData.term_hash root_item - state = %SData.MerkleList{ - root: root_hash, - top: root_hash, - cmp: cmp, - store: %{ root_hash => root_item } - } - state - end - - defp push(state, item) do - new_item = {item, state.top} - new_item_hash = SData.term_hash new_item - new_store = Map.put(state.store, new_item_hash, new_item) - %{ state | :top => new_item_hash, :store => new_store } - end - - defp pop(state) do - if state.top == state.root do - :error - else - {item, next} = Map.get(state.store, state.top) - new_store = Map.delete(state.store, state.top) - new_state = %{ state | :top => next, :store => new_store } - {:ok, item, new_state} - end - end - - @doc""" - Insert a list of items in the store. - - A callback function may be specified that is called on any item - that is sucessfully added, i.e. that wasn't present in the store before. - """ - def insert_many(state, items, callback \\ (fn _ -> nil end)) do - items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end) - insert_many_aux(state, items_sorted, callback) - end - - defp insert_many_aux(state, [], _callback) do - state - end - - defp insert_many_aux(state, [item | rest], callback) do - case pop(state) do - :error -> - new_state = push(insert_many_aux(state, rest, callback), item) - callback.(item) - new_state - {:ok, front, state_rest} -> - case state.cmp.(item, front) do - :after -> - new_state = push(insert_many_aux(state, rest, callback), item) - callback.(item) - new_state - :duplicate -> insert_many_aux(state, rest, callback) - :before -> push(insert_many_aux(state_rest, [item | rest], callback), front) - end - end - end - - @doc""" - Insert a single item in the store. - - A callback function may be specified that is called on the item - if it is sucessfully added, i.e. it wasn't present in the store before. - """ - def insert(state, item, callback \\ (fn _ -> nil end)) do - insert_many(state, [item], callback) - end - - @doc""" - Read some items from the state. - - The two parameters are optional: - - qbegin : hash of the first item to read - - qlimit : number of items to read - """ - def read(state, qbegin \\ nil, qlimit \\ nil) do - begin = qbegin || state.top - limit = qlimit || 20 - get_items_list(state, begin, limit) - end - - @doc""" - Get the hash of the last item - """ - def top(state) do - state.top - end - - @doc""" - Get the hash of the root item - """ - def root(state) do - state.root - end - - @doc""" - Check if the store holds a certain item - """ - def has(state, hash) do - Map.has_key?(state.store, hash) - end - - defp get_items_list(state, begin, limit) do - case limit do - 0 -> {:ok, [], begin} - _ -> - case Map.fetch(state.store, begin) do - {:ok, :root} -> - {:ok, [], nil } - {:ok, {item, next}} -> - case get_items_list(state, next, limit - 1) do - {:ok, rest, past} -> - {:ok, [ item | rest ], past } - {:error, reason} -> {:error, reason} - end - :error -> {:error, begin} - end - end - end -end diff --git a/lib/data/merklesearchtree.ex b/lib/data/merklesearchtree.ex deleted file mode 100644 index 941d31d..0000000 --- a/lib/data/merklesearchtree.ex +++ /dev/null @@ -1,387 +0,0 @@ -defmodule SData.MerkleSearchTree do - @moduledoc""" - A Merkle search tree. - - A node of the tree is - { - level, - hash_of_node | nil, - [ - { item_low_bound, hash_of_node | nil }, - { item_low_bound, hash_of_node | nil }, - ... - } - } - """ - - alias SData.PageStore, as: Store - - @doc""" - Create a new Merkle search tree. - - This structure can be used as a set with only true keys, - or as a map if a merge function is given. - - `cmp` is a compare function for keys as defined in module `SData`. - - `merge` is a function for merging two items that have the same key. - """ - defstruct root: nil, - store: SData.LocalStore.new, - cmp: &SData.cmp_term/2, - merge: &SData.merge_true/2 - - - defmodule Page do - defstruct [:level, :low, :list] - end - - defimpl SData.Page, for: Page do - def refs(page) do - refs = for {_, _, h} <- page.list, h != nil, do: h - if page.low != nil do - [ page.low | refs ] - else - refs - end - end - end - - - @doc""" - Insert an item into the search tree. - """ - def insert(state, key, value \\ true) do - level = calc_level(key) - {hash, store} = insert_at(state, state.store, state.root, key, level, value) - %{ state | root: hash, store: store } - end - - defp insert_at(s, store, root, key, level, value) do - {new_page, store} = if root == nil do - { %Page{ level: level, low: nil, list: [ { key, value, nil } ] }, store } - else - %Page{ level: plevel, low: low, list: lst } = Store.get(store, root) - [ { k0, _, _} | _ ] = lst - cond do - plevel < level -> - {low, high, store} = split(s, store, root, key) - { %Page{ level: level, low: low, list: [ { key, value, high } ] }, store } - plevel == level -> - store = Store.free(store, root) - case s.cmp.(key, k0) do - :before -> - {low2a, low2b, store} = split(s, store, low, key) - { %Page{ level: level, low: low2a, list: [ { key, value, low2b } | lst] }, store } - _ -> - {new_lst, store} = aux_insert_after_first(s, store, lst, key, value) - { %Page{ level: plevel, low: low, list: new_lst }, store } - end - plevel > level -> - store = Store.free(store, root) - case s.cmp.(key, k0) do - :before -> - {new_low, store} = insert_at(s, store, low, key, level, value) - { %Page{ level: plevel, low: new_low, list: lst }, store } - :after -> - {new_lst, store} = aux_insert_sub_after_first(s, store, lst, key, level, value) - { %Page{ level: plevel, low: low, list: new_lst }, store } - end - end - end - Store.put(store, new_page) # returns {hash, store} - end - - defp split(s, store, hash, key) do - if hash == nil do - {nil, nil, store} - else - %Page{ level: level, low: low, list: lst } = Store.get(store, hash) || Store.get(s.store, hash) - store = Store.free(store, hash) - [ { k0, _, _} | _ ] = lst - case s.cmp.(key, k0) do - :before -> - {lowlow, lowhi, store} = split(s, store, low, key) - newp2 = %Page{ level: level, low: lowhi, list: lst } - {newp2h, store} = Store.put(store, newp2) - {lowlow, newp2h, store} - :after -> - {lst1, p2, store} = split_aux(s, store, lst, key, level) - newp1 = %Page{ level: level, low: low, list: lst1} - {newp1h, store} = Store.put(store, newp1) - {newp1h, p2, store} - end - end - end - - defp split_aux(s, store, lst, key, level) do - case lst do - [ {k1, v1, r1} ] -> - if s.cmp.(k1, key) == :duplicate do - raise "Bad logic" - end - {r1l, r1h, store} = split(s, store, r1, key) - { [{k1, v1, r1l}], r1h, store } - [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] -> - case s.cmp.(key, k2) do - :before -> - {r1l, r1h, store} = split(s, store, r1, key) - p2 = %Page{ level: level, low: r1h, list: [ {k2, v2, r2} | rst ] } - {p2h, store} = Store.put(store, p2) - { [{k1, v1, r1l}], p2h, store } - :after -> - {rst2, hi, store} = split_aux(s, store, [rst1 | rst], key, level) - { [ fst | rst2 ], hi, store } - :duplicate -> - raise "Bad logic" - end - end - end - - defp aux_insert_after_first(s, store, lst, key, value) do - case lst do - [ {k1, v1, r1} ] -> - case s.cmp.(k1, key) do - :duplicate -> - { [ {k1, s.merge.(v1, value), r1} ], store } - :before -> - {r1a, r1b, new_store} = split(s, store, r1, key) - { [ {k1, v1, r1a}, {key, value, r1b} ], new_store } - end - [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] -> - case s.cmp.(k1, key) do - :duplicate -> - { [ {k1, s.merge.(v1, value), r1}, rst1 | rst ], store } - :before -> - case s.cmp.(k2, key) do - :after -> - {r1a, r1b, new_store} = split(s, store, r1, key) - { [ {k1, v1, r1a}, {key, value, r1b}, {k2, v2, r2} | rst ], new_store } - _ -> - {rst2, new_store} = aux_insert_after_first(s, store, [rst1 | rst], key, value) - { [ fst | rst2 ], new_store } - end - end - end - end - - defp aux_insert_sub_after_first(s, store, lst, key, level, value) do - case lst do - [ {k1, v1, r1} ] -> - if s.cmp.(k1, key) == :duplicate do - raise "Bad logic" - end - {r1new, store_new} = insert_at(s, store, r1, key, level, value) - { [{k1, v1, r1new}], store_new } - [ {k1, v1, r1} = fst, {k2, _, _} = rst1 | rst ] -> - if s.cmp.(k1, key) == :duplicate do - raise "Bad logic" - end - case s.cmp.(key, k2) do - :before -> - {r1new, store_new} = insert_at(s, store, r1, key, level, value) - { [{k1, v1, r1new}, rst1 | rst], store_new } - _ -> - {rst2, new_store} = aux_insert_sub_after_first(s, store, [rst1 |rst], key, level, value) - { [ fst | rst2 ], new_store } - end - end - end - - @doc""" - Merge values from another MST in this MST. - - The merge is not symmetrical in the sense that: - - new pages are added in the store of the first argument - - the callback is called for all items found in the second argument and not the first - """ - def merge(to, from, callback \\ fn _, _ -> nil end) do - { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback) - %{ to | store: store, root: root } - end - - defp merge_aux(s1, s2, store, r1, r2, callback) do - case {r1, r2} do - _ when r1 == r2 -> { store, r1 } - {_, nil} -> - { store, r1 } - {nil, _} -> - store = Store.copy(store, s2.store, r2) - rec_callback(store, r2, callback) - { store, r2 } - _ -> - %Page{ level: level1, low: low1, list: lst1 } = Store.get(store, r1) - %Page{ level: level2, low: low2, list: lst2 } = Store.get(store, r2) || Store.get(s2.store, r2) - { level, low1, lst1, low2, lst2 } = cond do - level1 == level2 -> {level1, low1, lst1, low2, lst2} - level1 > level2 -> {level1, low1, lst1, r2, []} - level2 > level1 -> {level2, r1, [], low2, lst2} - end - { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) - page = %Page{ level: level, low: low, list: lst } - {hash, store} = Store.put(store, page) - {store, hash} - end - end - - defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) do - case {lst1, lst2} do - { [], [] } -> - {store, hash} = merge_aux(s1, s2, store, low1, low2, callback) - {store, hash, []} - { [], [ {k, v, r} | rst2 ] } -> - {low1l, low1h, store} = split(s1, store, low1, k) - {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback) - callback.(k, v) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2, callback) - {store, newlow, [ {k, v, newr} | newrst ]} - { [ {k, v, r} | rst1 ], [] } -> - {low2l, low2h, store} = split(s2, store, low2, k) - {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [], callback) - {store, newlow, [ {k, v, newr} | newrst ]} - { [ {k1, v1, r1} | rst1 ], [ {k2, v2, r2} | rst2 ] } -> - case s1.cmp.(k1, k2) do - :before -> - {low2l, low2h, store} = split(s2, store, low2, k1) - {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2, callback) - {store, newlow, [ {k1, v1, newr} | newrst ]} - :after -> - {low1l, low1h, store} = split(s1, store, low1, k2) - {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback) - callback.(k2, v2) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2, callback) - {store, newlow, [ {k2, v2, newr} | newrst ]} - :duplicate -> - {store, newlow} = merge_aux(s1, s2, store, low1, low2, callback) - newv = s1.merge.(v1, v2) ## TODO: callback here ?? - {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2, callback) - {store, newlow, [ {k1, newv, newr} | newrst ]} - end - end - end - - defp rec_callback(store, root, callback) do - case root do - nil -> nil - _ -> - %Page{ level: _, low: low, list: lst } = Store.get(store, root) - rec_callback(store, low, callback) - for {k, v, rst} <- lst do - callback.(k, v) - rec_callback(store, rst, callback) - end - end - end - - @doc""" - Get value for a specific key in search tree. - """ - def get(state, key) do - get(state, state.root, key) - end - - defp get(s, root, key) do - case root do - nil -> nil - _ -> - %Page{ level: _, low: low, list: lst } = Store.get(s.store, root) - get_aux(s, low, lst, key) - end - end - - defp get_aux(s, low, lst, key) do - case lst do - [] -> - get(s, low, key) - [ {k, v, low2} | rst ] -> - case s.cmp.(key, k) do - :duplicate -> v - :before -> - get(s, low, key) - :after -> - get_aux(s, low2, rst, key) - end - end - end - - @doc""" - Get the last n items of the tree, or the last n items - strictly before given upper bound if non nil - """ - def last(state, top_bound, num) do - last(state, state.root, top_bound, num) - end - - defp last(s, root, top_bound, num) do - case root do - nil -> [] - _ -> - %Page{ level: _, low: low, list: lst } = Store.get(s.store, root) - last_aux(s, low, lst, top_bound, num) - end - end - - defp last_aux(s, low, lst, top_bound, num) do - case lst do - [] -> - last(s, low, top_bound, num) - [ {k, v, low2} | rst ] -> - if top_bound == nil or s.cmp.(top_bound, k) == :after do - items = last_aux(s, low2, rst, top_bound, num) - items = if Enum.count(items) < num do - [ {k, v} | items ] - else - items - end - cnt = Enum.count items - if cnt < num do - last(s, low, top_bound, num - cnt) ++ items - else - items - end - else - last(s, low, top_bound, num) - end - end - end - - - @doc""" - Dump Merkle search tree structure. - """ - def dump(state) do - dump(state.store, state.root, "") - end - - defp dump(store, root, lvl) do - case root do - nil -> - IO.puts(lvl <> "nil") - _ -> - %Page{ level: level, low: low, list: lst} = Store.get(store, root) - IO.puts(lvl <> "#{root|>Base.encode16} (#{level})") - dump(store, low, lvl <> " ") - for {k, v, r} <- lst do - IO.puts(lvl<>"- #{inspect k} => #{inspect v}") - dump(store, r, lvl <> " ") - end - end - end - - defp calc_level(key) do - key - |> SData.term_hash - |> Base.encode16 - |> String.to_charlist - |> count_leading_zeroes - end - - defp count_leading_zeroes('0' ++ rest) do - 1 + count_leading_zeroes(rest) - end - defp count_leading_zeroes(_) do - 0 - end -end diff --git a/lib/data/store.ex b/lib/data/store.ex deleted file mode 100644 index ca12cd0..0000000 --- a/lib/data/store.ex +++ /dev/null @@ -1,91 +0,0 @@ -defprotocol SData.Page do - @moduledoc""" - Protocol to be implemented by objects that are used as data pages - in a pagestore and that may reference other data pages by their hash. - """ - - @fallback_to_any true - - @doc""" - Get hashes of all pages referenced by this page. - """ - def refs(page) -end - -defimpl SData.Page, for: Any do - def refs(_page), do: [] -end - - -defprotocol SData.PageStore do - @moduledoc""" - Protocol to be implemented for page stores to allow their - manipulation. - - This protocol may also be implemented by store proxies that track - operations and implement different synchronization or caching mechanisms. - """ - - @doc""" - Put a page. Argument is the content of the page, returns the - hash that the store has associated to it. - - Returns {hash, store} - """ - def put(store, page) - - @doc""" - Get a page referenced by its hash. - - Returns page - """ - def get(store, hash) - - @doc""" - Copy to the store a page and all its references from the other store. - In the case of pages on the network in a distributed store, this may - be lazy. - - Returns store - """ - def copy(store, other_store, hash) - - @doc""" - Free a page referenced by its hash, marking it as no longer needed. - - Returns store - """ - def free(store, hash) -end - - -defmodule SData.LocalStore do - defstruct [:pages] - - def new() do - %SData.LocalStore{ pages: %{} } - end -end - -defimpl SData.PageStore, for: SData.LocalStore do - def put(store, page) do - hash = SData.term_hash page - store = %{ store | pages: Map.put(store.pages, hash, page) } - { hash, store } - end - - def get(store, hash) do - store.pages[hash] - end - - def copy(store, other_store, hash) do - page = SData.PageStore.get(other_store, hash) - refs = SData.Page.refs(page) - store = Enum.reduce(refs, store, fn x, acc -> copy(acc, other_store, x) end) - %{ store | pages: Map.put(store.pages, hash, page) } - end - - def free(store, hash) do - %{ store | pages: Map.delete(store.pages, hash) } - end -end diff --git a/lib/identity.ex b/lib/identity.ex deleted file mode 100644 index f1899df..0000000 --- a/lib/identity.ex +++ /dev/null @@ -1,46 +0,0 @@ -defmodule Shard.Identity do - use Agent - require Salty.Sign.Ed25519, as: Sign - require Logger - - def start_link(_) do - Agent.start_link(__MODULE__, :init, [], name: __MODULE__) - end - - def init() do - Logger.info "Generating keypair..." - {pk, sk} = gen_keypair(Application.get_env(:shard, :peer_id_suffix)) - nick_suffix = pk - |> binary_part(0, 3) - |> Base.encode16 - |> String.downcase - %{ - keypair: {pk, sk}, - nickname: "Anon" <> nick_suffix, - } - end - - defp gen_keypair(suffix, n \\ 0) do - {:ok, pk, sk} = Sign.keypair - if rem(n, 10000) == 0 do - Logger.info "#{n}... expected #{:math.pow(256, byte_size(suffix))}" - end - if :binary.longest_common_suffix([pk, suffix]) == byte_size(suffix) do - {pk, sk} - else - gen_keypair(suffix, n+1) - end - end - - def get_keypair() do - Agent.get(__MODULE__, &(&1.keypair)) - end - - def get_nickname() do - Agent.get(__MODULE__, &(&1.nickname)) - end - - def set_nickname(newnick) do - Agent.update(__MODULE__, &(%{&1 | nickname: newnick})) - end -end diff --git a/lib/manager.ex b/lib/manager.ex deleted file mode 100644 index 82984d6..0000000 --- a/lib/manager.ex +++ /dev/null @@ -1,243 +0,0 @@ -defmodule Shard.Manager do - @moduledoc""" - Maintains several important tables : - - - :peer_db - - List of - { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen } - - - :shard_db - - List of - { id, manifest, pid | nil } - - - :shard_procs - - List of - { {id, path}, pid } - - - :shard_peer_db - - Mult-list of - { shard_id, peer_id } - - - And an internal table : - - - :outbox - - Multi-list of - { peer_id, message } - - """ - - use GenServer - - require Logger - - def start_link(my_port) do - GenServer.start_link(__MODULE__, my_port, name: __MODULE__) - end - - def init(my_port) do - :ets.new(:peer_db, [:set, :protected, :named_table]) - :ets.new(:shard_db, [:set, :protected, :named_table]) - :ets.new(:shard_procs, [:set, :protected, :named_table]) - :ets.new(:shard_peer_db, [:bag, :protected, :named_table]) - outbox = :ets.new(:outbox, [:bag, :private]) - - {:ok, %{my_port: my_port, outbox: outbox} } - end - - def handle_call({:register, shard_id, manifest, pid}, _from, state) do - will_live = case :ets.lookup(:shard_db, shard_id) do - [{ ^shard_id, _, pid }] -> not Process.alive?(pid) - _ -> true - end - reply = if will_live do - Process.monitor(pid) - :ets.insert(:shard_db, {shard_id, manifest, pid}) - :ok - else - :redundant - end - {:reply, reply, state} - end - - def handle_cast({:dispatch_to, shard_id, path, pid}, state) do - :ets.insert(:shard_procs, { {shard_id, path}, pid }) - Process.monitor(pid) - {:noreply, state} - end - - def handle_cast({:interested, peer_id, shards}, state) do - for shard_id <- shards do - case :ets.lookup(:shard_db, shard_id) do - [{ ^shard_id, _, pid }] -> - :ets.insert(:shard_peer_db, {shard_id, peer_id}) - GenServer.cast(pid, {:interested, peer_id}) - [] -> nil - end - end - {:noreply, state} - end - - def handle_cast({:not_interested, peer_id, shard_id}, state) do - :ets.match_delete(:shard_peer_db, {shard_id, peer_id}) - {:noreply, state} - end - - def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do - :ets.insert(:shard_peer_db, {shard_id, peer_id}) - {:noreply, state} - end - - def handle_cast({:peer_up, pk, pid, ip, port}, state) do - for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do - if pk2 != pk do - # obsolete peer information - :ets.delete(:peer_db, pk2) - :ets.match_delete(:shard_peer_db, {:_, pk2}) - end - end - :ets.insert(:peer_db, {pk, pid, ip, port}) - - # Send interested message for all our shards - id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id) - GenServer.cast(pid, {:send_msg, {:interested, id_list}}) - - # Send queued messages - for {_, msg, _} <- :ets.lookup(state.outbox, pk) do - GenServer.cast(pid, {:send_msg, msg}) - end - :ets.delete(state.outbox, pk) - - {:noreply, state} - end - - def handle_cast({:peer_down, pk, ip, port}, state) do - :ets.insert(:peer_db, {pk, nil, ip, port}) - {:noreply, state} - end - - def handle_cast({:connect_and_send, peer_id, msg}, state) do - case :ets.lookup(:peer_db, peer_id) do - [{^peer_id, nil, ip, port}] -> - add_peer(ip, port, state) - currtime = System.os_time :second - :ets.insert(state.outbox, {peer_id, msg, currtime}) - outbox_cleanup = [{{:_, :_, :'$1'}, - [{:<, :'$1', currtime - 60}], - [:'$1']}] - :ets.select_delete(state.outbox, outbox_cleanup) - _ -> - Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database" - end - {:noreply, state} - end - - def handle_cast({:try_connect, pk_list}, state) do - for pk <- pk_list do - case :ets.lookup(:peer_db, pk) do - [{^pk, nil, ip, port}] -> - add_peer(ip, port, state) - _ -> nil - end - end - {:noreply, state} - end - - def handle_cast({:add_peer, ip, port}, state) do - add_peer(ip, port, state) - {:noreply, state} - end - - def handle_info({:DOWN, _, :process, pid, _}, state) do - :ets.match_delete(:shard_procs, {:_, pid}) - {:noreply, state} - end - - defp add_peer(ip, port, state) do - spawn fn -> - case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do - {:ok, client} -> - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) - :ok = :gen_tcp.controlling_process(client, pid) - _ -> - Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" - end - end - end - - - # ================ - # PUBLIC INTERFACE - # ================ - - - @doc""" - Connect to a peer specified by ip address and port - """ - def add_peer(ip, port) do - GenServer.cast(__MODULE__, {:add_peer, ip, port}) - end - - @doc""" - Send message to a peer specified by peer id - """ - def send(peer_id, msg) do - case :ets.lookup(:peer_db, peer_id) do - [{ ^peer_id, pid, _, _}] when pid != nil-> - GenServer.cast(pid, {:send_msg, msg}) - _ -> - GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) - end - end - - @doc""" - Dispatch incoming message to correct shard process - """ - def dispatch(peer_id, {shard_id, path, msg}) do - case :ets.lookup(:shard_db, shard_id) do - [] -> - __MODULE__.send(peer_id, {:not_interested, shard_id}) - [_] -> - case :ets.match(:shard_peer_db, {shard_id, peer_id}) do - [] -> - GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id}) - _ -> nil - end - case :ets.lookup(:shard_procs, {shard_id, path}) do - [{ {^shard_id, ^path}, pid }] -> - GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg}) - [] -> - Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}") - end - end - end - - def dispatch(peer_id, {:interested, shards}) do - GenServer.cast(__MODULE__, {:interested, peer_id, shards}) - end - - def dispatch(peer_id, {:not_interested, shard}) do - GenServer.cast(__MODULE__, {:not_interested, peer_id, shard}) - end - - @doc""" - Register a process as the main process for a shard. - - Returns either :ok or :redundant, in which case the process must exit. - """ - def register(shard_id, manifest, pid) do - GenServer.call(__MODULE__, {:register, shard_id, manifest, pid}) - end - - @doc""" - Register a process as the handler for shard packets for a given path. - """ - def dispatch_to(shard_id, path, pid) do - GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) - end -end diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex deleted file mode 100644 index 44669bb..0000000 --- a/lib/net/tcpconn.ex +++ /dev/null @@ -1,106 +0,0 @@ -defmodule SNet.TCPConn do - use GenServer, restart: :temporary - require Salty.Box.Curve25519xchacha20poly1305, as: Box - require Salty.Sign.Ed25519, as: Sign - require Logger - - def start_link(state) do - GenServer.start_link(__MODULE__, state) - end - - def init(state) do - GenServer.cast(self(), :handshake) - {:ok, state} - end - - def handle_call(:get_host_str, _from, state) do - {:reply, "#{state.his_pkey|>Base.encode16|>String.downcase}@#{to_string(:inet_parse.ntoa(state.addr))}:#{state.port}", state} - end - - def handle_cast(:handshake, state) do - socket = state.socket - - {srv_pkey, srv_skey} = Shard.Identity.get_keypair - {:ok, sess_pkey, sess_skey} = Box.keypair - {:ok, challenge} = Salty.Random.buf 32 - - # Exchange public keys and challenge - hello = {srv_pkey, sess_pkey, challenge, state.my_port} - :gen_tcp.send(socket, :erlang.term_to_binary hello) - {:ok, pkt} = :gen_tcp.recv(socket, 0) - {cli_pkey, cli_sess_pkey, cli_challenge, his_port} = :erlang.binary_to_term(pkt, [:safe]) - - # Do challenge and check their challenge - {:ok, cli_challenge_sign} = Sign.sign_detached(cli_challenge, srv_skey) - pkt = encode_pkt(cli_challenge_sign, cli_sess_pkey, sess_skey) - :gen_tcp.send(socket, pkt) - - {:ok, pkt} = :gen_tcp.recv(socket, 0) - challenge_sign = decode_pkt(pkt, cli_sess_pkey, sess_skey) - :ok = Sign.verify_detached(challenge_sign, challenge, cli_pkey) - - expected_suffix = Application.get_env(:shard, :peer_id_suffix) - len = byte_size(expected_suffix) - ^len = :binary.longest_common_suffix([cli_pkey, expected_suffix]) - - # Connected - :inet.setopts(socket, [active: true]) - - {:ok, {addr, port}} = :inet.peername socket - state =%{ socket: socket, - my_pkey: srv_pkey, - my_skey: srv_skey, - his_pkey: cli_pkey, - conn_my_pkey: sess_pkey, - conn_my_skey: sess_skey, - conn_his_pkey: cli_sess_pkey, - addr: addr, - port: port, - his_port: his_port - } - GenServer.cast(Shard.Manager, {:peer_up, cli_pkey, self(), addr, his_port}) - Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" - - {:noreply, state} - end - - def handle_cast({:send_msg, msg}, state) do - msgbin = :erlang.term_to_binary msg - enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey) - :gen_tcp.send(state.socket, enc) - {:noreply, state} - end - - defp encode_pkt(pkt, pk, sk) do - {:ok, n} = Salty.Random.buf Box.noncebytes - {:ok, msg} = Box.easy(pkt, n, pk, sk) - n <> msg - end - - defp decode_pkt(pkt, pk, sk) do - n = binary_part(pkt, 0, Box.noncebytes) - enc = binary_part(pkt, Box.noncebytes, (byte_size pkt) - Box.noncebytes) - {:ok, msg} = Box.open_easy(enc, n, pk, sk) - msg - end - - def handle_info({:tcp, _socket, raw_data}, state) do - msg = decode_pkt(raw_data, state.conn_his_pkey, state.conn_my_skey) - msg_data = :erlang.binary_to_term(msg, [:safe]) - Shard.Manager.dispatch(state.his_pkey, msg_data) - {:noreply, state} - end - - def handle_info({:tcp_closed, _socket}, state) do - Logger.info "Disconnected: #{print_id state} at #{inspect state.addr}:#{state.port}" - GenServer.cast(Shard.Manager, {:peer_down, state.his_pkey, state.addr, state.his_port}) - exit(:normal) - end - - defp print_id(state) do - state.his_pkey - |> binary_part(0, 8) - |> Base.encode16 - |> String.downcase - end -end diff --git a/lib/net/tcpserver.ex b/lib/net/tcpserver.ex deleted file mode 100644 index 46552a4..0000000 --- a/lib/net/tcpserver.ex +++ /dev/null @@ -1,26 +0,0 @@ -defmodule SNet.TCPServer do - require Logger - use Task, restart: :permanent - - def start_link(port) do - Task.start_link(__MODULE__, :accept, [port]) - end - - @doc """ - Starts accepting connections on the given `port`. - """ - def accept(port) do - {:ok, socket} = :gen_tcp.listen(port, - [:binary, packet: 2, active: false, reuseaddr: true]) - Logger.info "Accepting connections on port #{port}" - loop_acceptor(socket, port) - end - - defp loop_acceptor(socket, my_port) do - {:ok, client} = :gen_tcp.accept(socket) - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}}) - :ok = :gen_tcp.controlling_process(client, pid) - loop_acceptor(socket, my_port) - end -end - |