aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/app/blockstore.ex149
-rw-r--r--lib/app/chat.ex208
-rw-r--r--lib/application.ex33
-rw-r--r--lib/cli/cli.ex98
-rw-r--r--lib/data/data.ex49
-rw-r--r--lib/data/merklelist.ex143
-rw-r--r--lib/data/merklesearchtree.ex387
-rw-r--r--lib/data/store.ex91
-rw-r--r--lib/identity.ex46
-rw-r--r--lib/manager.ex243
-rw-r--r--lib/net/tcpconn.ex106
-rw-r--r--lib/net/tcpserver.ex26
12 files changed, 0 insertions, 1579 deletions
diff --git a/lib/app/blockstore.ex b/lib/app/blockstore.ex
deleted file mode 100644
index 8e4fddc..0000000
--- a/lib/app/blockstore.ex
+++ /dev/null
@@ -1,149 +0,0 @@
-defmodule SApp.BlockStore do
- @moduledoc """
- A module that implements a content-adressable storage (blocks, or pages,
- identified by the hash of their contents).
-
- This is not a shard, it is a side process that a shard may use to store its data.
-
- TODO: WIP
- """
-
- use GenServer
-
- @enforce_keys [:pid]
- defstruct [:pid, :prefer_ask]
-
-
- defmodule State do
- defstruct [:shard_id, :path, :store, :reqs, :retries]
- end
-
-
- def start_link(shard_id, path) do
- GenServer.start_link(__MODULE__, [shard_id, path])
- end
-
- def init([shard_id, path]) do
- Shard.Manager.dispatch_to(shard_id, path, self())
-
- {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}}
- end
-
- def handle_call({:get, key, prefer_ask}, from, state) do
- case state.store[key] do
- nil ->
- case prefer_ask do
- [_ | _] ->
- for peer <- prefer_ask do
- Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
- end
- _ ->
- ask_random_peers(state, key)
- end
- reqs_key = case state.reqs[key] do
- nil ->
- MapSet.put(MapSet.new(), from)
- ms ->
- MapSet.put(ms, from)
- end
- state = put_in(state.reqs[key], reqs_key)
- {:noreply, state}
- v ->
- {:reply, v, state}
- end
- end
-
- def handle_call({:put, val}, _from, state) do
- hash = SData.term_hash val
- state = %{state | store: Map.put(state.store, hash, val)}
- {:reply, hash, state}
- end
-
- def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
- state = case msg do
- {:get, key} ->
- case state.store[key] do
- nil ->
- Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
- v ->
- Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
- end
- state
- {:info, hash, value} ->
- if SData.term_hash value == hash do
- reqs = case state.reqs[hash] do
- nil -> state.reqs
- pids ->
- for pid <- pids do
- GenServer.reply(pid, value)
- end
- Map.delete(state.reqs, hash)
- end
- state = %{state | retries: Map.delete(state.retries, hash)}
- %{state | store: Map.put(state.store, hash, value), reqs: reqs}
- else
- state
- end
- {:not_found, key} ->
- if state.reqs[key] != nil and state.store[key] == nil do
- nretry = case state.retries[key] do
- nil -> 1
- n -> n+1
- end
- if nretry < 3 do
- ask_random_peers(state, key)
- %{state | retries: Map.put(state.retries, key, nretry)}
- else
- for pid <- state.reqs[key] do
- GenServer.reply(pid, nil)
- end
- state = %{state | reqs: Map.delete(state.reqs, key)}
- state = %{state | retries: Map.delete(state.retries, key)}
- state
- end
- else
- state
- end
- end
- {:noreply, state}
- end
-
- def ask_random_peers(state, key) do
- peers = :ets.lookup(:shard_peer_db, state.shard_id)
- |> Enum.shuffle
- |> Enum.take(3)
- for {_, peer} <- peers do
- Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
- end
- end
-
-
- defimpl SData.PageStore do
- def put(store, page) do
- hash = GenServer.call(store.pid, {:put, page})
- { hash, store }
- end
-
- def get(store, hash) do
- try do
- GenServer.call(store.pid, {:get, hash, store.prefer_ask})
- catch
- :exit, {:timeout, _} -> nil
- end
- end
-
- def copy(store, other_store, hash) do
- page = SData.PageStore.get(other_store, hash)
- refs = SData.Page.refs(page)
- for ref <- refs do
- copy(store, other_store, ref)
- end
- GenServer.call(store.pid, {:put, page})
- store
- end
-
- def free(store, _hash) do
- store ## DO SOMETHING???
- end
- end
-end
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
deleted file mode 100644
index 051fab6..0000000
--- a/lib/app/chat.ex
+++ /dev/null
@@ -1,208 +0,0 @@
-defmodule SApp.Chat do
- @moduledoc """
- Shard application for a replicated chat room with full history.
-
- Chat rooms are globally identified by their channel name.
- A chat room manifest is of the form:
-
- {:chat, channel_name}
-
- Future improvements:
- - message signing
- - storage of the chatroom messages to disk
- - storage of the known peers that have this channel to disk
- - use a DHT to find peers that are interested in this channel
- - epidemic broadcast (carefull not to be too costly,
- maybe by limiting the number of peers we talk to)
- - partial synchronization only == data distributed over peers
- """
-
- use GenServer
-
- require Logger
- alias SData.MerkleSearchTree, as: MST
-
- @doc """
- Start a process that connects to a given channel
- """
- def start_link(channel) do
- GenServer.start_link(__MODULE__, channel)
- end
-
- @doc """
- Initialize channel process.
- """
- def init(channel) do
- manifest = {:chat, channel}
- id = SData.term_hash manifest
-
- case Shard.Manager.register(id, manifest, self()) do
- :ok ->
- Shard.Manager.dispatch_to(id, nil, self())
- {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store)
- mst = %MST{store: %SApp.BlockStore{pid: block_store},
- cmp: &msg_cmp/2}
- GenServer.cast(self(), :init_pull)
- {:ok,
- %{channel: channel,
- id: id,
- manifest: manifest,
- block_store: block_store,
- mst: mst,
- subs: MapSet.new,
- }
- }
- :redundant ->
- exit(:redundant)
- end
- end
-
- @doc """
- Implementation of the :manifest call that returns the chat room's manifest
- """
- def handle_call(:manifest, _from, state) do
- {:reply, state.manifest, state}
- end
-
- def handle_call({:read_history, top_bound, num}, _from, state) do
- ret = MST.last(state.mst, top_bound, num)
- {:reply, ret, state}
- end
-
- @doc """
- Implementation of the :init_pull handler, which is called when the
- process starts. It contacts all currently connected peers and asks them to
- send data for this channel if they have some.
- """
- def handle_cast(:init_pull, state) do
- for {_, pid, _, _} <- :ets.tab2list(:peer_db) do
- GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
- end
- {:noreply, state}
- end
-
- @doc """
- Implementation of the :chat_send handler. This is the main handler that is used
- to send a message to the chat room. Puts the message in the store and syncs
- with all connected peers.
- """
- def handle_cast({:chat_send, msg}, state) do
- msgitem = {(System.os_time :seconds),
- Shard.Identity.get_nickname(),
- msg}
- prev_root = state.mst.root
- mst = MST.insert(state.mst, msgitem)
- state = %{state | mst: mst}
-
- for pid <- state.subs do
- if Process.alive?(pid) do
- send(pid, {:chat_send, state.channel, msgitem})
- end
- end
-
- notif = {:append, prev_root, msgitem, mst.root}
- for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do
- Shard.Manager.send(peer_id, {state.id, nil, notif})
- end
-
- {:noreply, state}
- end
-
- @doc """
- Implementation of the :interested handler, this is called when a peer we are
- connected to asks to recieve data for this channel.
- """
- def handle_cast({:interested, peer_id}, state) do
- Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}})
- {:noreply, state}
- end
-
- def handle_cast({:subscribe, pid}, state) do
- Process.monitor(pid)
- new_subs = MapSet.put(state.subs, pid)
- {:noreply, %{ state | subs: new_subs }}
- end
-
- @doc """
- Implementation of the :msg handler, which is the main handler for messages
- comming from other peers concerning this chat room.
-
- Messages are:
- - `{:get, start}`: get some messages starting at a given Merkle hash
- - `{:info, start, list, rest}`: put some messages and informs of the
- Merkle hash of the store of older messages.
- """
- def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do
- state = case msg do
- {:get_manifest} ->
- Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}})
- state
- {:append, prev_root, msgitem, new_root} ->
- # Append message: one single mesage has arrived
- if new_root == state.mst.root do
- # We already have the message, do nothing
- state
- else
- # Try adding the message
- if prev_root == state.mst.root do
- mst2 = MST.insert(state.mst, msgitem)
- if mst2.root == new_root do
- # This was the only message missing, we are happy!
- state = %{state | mst: mst2}
- msg_callback(state, msgitem)
- state
- else
- # More messages are missing, start a full merge
- init_merge(state, new_root, peer_id)
- end
- else
- init_merge(state, new_root, peer_id)
- end
- end
- {:root, new_root} ->
- if new_root == state.mst.root do
- # already up to date, ignore
- state
- else
- init_merge(state, new_root, peer_id)
- end
- x ->
- Logger.info("Unhandled message: #{inspect x}")
- state
- end
- {:noreply, state}
- end
-
- defp init_merge(state, new_root, source_peer) do
- # TODO: make the merge asynchronous
- mgmst = %{state.mst | root: new_root}
- mgmst = put_in(mgmst.store.prefer_ask, [source_peer])
- mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end)
- %{state | mst: mst}
- end
-
- def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
- new_subs = MapSet.delete(state.subs, pid)
- {:noreply, %{ state | subs: new_subs }}
- end
-
- defp msg_callback(state, {ts, nick, msg}) do
- for pid <- state.subs do
- if Process.alive?(pid) do
- send(pid, {:chat_recv, state.channel, {ts, nick, msg}})
- end
- end
- end
-
- defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
- cond do
- ts1 > ts2 -> :after
- ts1 < ts2 -> :before
- nick1 > nick2 -> :after
- nick1 < nick2 -> :before
- msg1 > msg2 -> :after
- msg1 < msg2 -> :before
- true -> :duplicate
- end
- end
-end
diff --git a/lib/application.ex b/lib/application.ex
deleted file mode 100644
index 3e3a6ac..0000000
--- a/lib/application.ex
+++ /dev/null
@@ -1,33 +0,0 @@
-defmodule Shard.Application do
- @moduledoc """
- Main Shard application.
-
- Shard is a prototype peer-to-peer comunication platform with data
- synchronization.
- """
-
- use Application
-
- def start(_type, _args) do
- import Supervisor.Spec, warn: false
-
- {listen_port, _} = Integer.parse ((System.get_env "PORT") || "4044")
-
- # Define workers and child supervisors to be supervised
- children = [
- Shard.Identity,
- { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor },
-
- # Networking
- { SNet.TCPServer, listen_port },
-
- # Applications & data store
- { Shard.Manager, listen_port },
- ]
-
- # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
- # for other strategies and supported options
- opts = [strategy: :one_for_one, name: Shard.Supervisor]
- Supervisor.start_link(children, opts)
- end
-end
diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex
deleted file mode 100644
index 2fbf8c2..0000000
--- a/lib/cli/cli.ex
+++ /dev/null
@@ -1,98 +0,0 @@
-defmodule SCLI do
- @moduledoc """
- Small command line interface for the chat application
- """
-
- def run() do
- run(nil)
- end
-
- defp run(pid) do
- handle_messages()
-
- nick = Shard.Identity.get_nickname
- prompt = case pid do
- nil -> "(no channel) #{nick}: "
- _ ->
- {:chat, chan} = GenServer.call(pid, :manifest)
- "##{chan} #{nick}: "
- end
-
- str = prompt |> IO.gets |> String.trim
- cond do
- str == "/quit" ->
- nil
- String.slice(str, 0..0) == "/" ->
- command = str |> String.slice(1..-1) |> String.split(" ")
- pid2 = handle_command(pid, command)
- run(pid2)
- true ->
- if str != "" do
- GenServer.cast(pid, {:chat_send, str})
- end
- run(pid)
- end
- end
-
- defp handle_messages() do
- receive do
- {:chat_recv, chan, {ts, nick, msg}} ->
- IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}"
- handle_messages()
- {:chat_send, _, _} ->
- # do nothing
- handle_messages()
- after 10 -> nil
- end
- end
-
- defp handle_command(pid, ["connect", ipstr, portstr]) do
- {:ok, ip} = :inet.parse_address (to_charlist ipstr)
- {port, _} = Integer.parse portstr
- Shard.Manager.add_peer(ip, port)
- pid
- end
-
- defp handle_command(pid, ["list"]) do
- IO.puts "List of known channels:"
-
- for {_chid, manifest, _chpid} <- :ets.tab2list(:shard_db) do
- {:chat, chan} = manifest
- IO.puts "##{chan}"
- end
- pid
- end
-
- defp handle_command(pid, ["hist"]) do
- GenServer.call(pid, {:read_history, nil, 25})
- |> Enum.each(fn {{ts, nick, msg}, true} ->
- IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}"
- end)
- pid
- end
-
- defp handle_command(_pid, ["join", qchan]) do
- list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db),
- {:chat, chan} = manifest,
- do: {chan, chpid}
- case List.keyfind(list, qchan, 0) do
- nil ->
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan})
- GenServer.cast(pid, {:subscribe, self()})
- pid
- {_, pid} ->
- IO.puts "Switching to ##{qchan}"
- pid
- end
- end
-
- defp handle_command(pid, ["nick", nick]) do
- Shard.Identity.set_nickname nick
- pid
- end
-
- defp handle_command(pid, _cmd) do
- IO.puts "Invalid command"
- pid
- end
-end
diff --git a/lib/data/data.ex b/lib/data/data.ex
deleted file mode 100644
index c2c659d..0000000
--- a/lib/data/data.ex
+++ /dev/null
@@ -1,49 +0,0 @@
-defmodule SData do
- @moduledoc """
- Utility functions
-
- Compare functions are functions that compares stored items and provides a total order.
- They must return:
- - `:after` if the first argument is more recent
- - `:duplicate` if the two items are the same
- - `:before` if the first argument is older
- These functions must only return :duplicate for equal items.
- """
-
- @doc """
- Calculate the hash of an Erlang term by first converting it to its
- binary representation.
- """
- def term_hash(term, algo \\ :sha256) do
- :crypto.hash(algo, (:erlang.term_to_binary term))
- end
-
- @doc"""
- Compare function for arbitrary terms using the Erlang order
- """
- def cmp_term(a, b) do
- cond do
- a > b -> :after
- a < b -> :before
- a == b -> :duplicate
- end
- end
-
- @doc"""
- Compare function for timestamped strings
- """
- def cmp_ts_str({ts1, str1}, {ts2, str2}) do
- cond do
- ts1 > ts2 -> :after
- ts1 < ts2 -> :before
- str1 > str2 -> :after
- str1 < str2 -> :before
- true -> :duplicate
- end
- end
-
- @doc"""
- Merge function for nils
- """
- def merge_true(true, true), do: true
-end
diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex
deleted file mode 100644
index 9b44ee8..0000000
--- a/lib/data/merklelist.ex
+++ /dev/null
@@ -1,143 +0,0 @@
-defmodule SData.MerkleList do
- @moduledoc"""
- A simple Merkle list store.
-
- Future improvements:
- - When messages are inserted other than at the top, all intermediate hashes
- change. Keep track of the mapping from old hashes to new hashes so that get
- requests can work even for hashes that are not valid anymore.
- - group items in "pages" (bigger bundles)
- """
-
- defstruct [:root, :top, :cmp, :store]
-
- @doc"""
- Create a Merkle list store.
-
- `cmp` is a compare function that respects the interface defined in module `SData`.
- """
- def new(cmp) do
- root_item = :root
- root_hash = SData.term_hash root_item
- state = %SData.MerkleList{
- root: root_hash,
- top: root_hash,
- cmp: cmp,
- store: %{ root_hash => root_item }
- }
- state
- end
-
- defp push(state, item) do
- new_item = {item, state.top}
- new_item_hash = SData.term_hash new_item
- new_store = Map.put(state.store, new_item_hash, new_item)
- %{ state | :top => new_item_hash, :store => new_store }
- end
-
- defp pop(state) do
- if state.top == state.root do
- :error
- else
- {item, next} = Map.get(state.store, state.top)
- new_store = Map.delete(state.store, state.top)
- new_state = %{ state | :top => next, :store => new_store }
- {:ok, item, new_state}
- end
- end
-
- @doc"""
- Insert a list of items in the store.
-
- A callback function may be specified that is called on any item
- that is sucessfully added, i.e. that wasn't present in the store before.
- """
- def insert_many(state, items, callback \\ (fn _ -> nil end)) do
- items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end)
- insert_many_aux(state, items_sorted, callback)
- end
-
- defp insert_many_aux(state, [], _callback) do
- state
- end
-
- defp insert_many_aux(state, [item | rest], callback) do
- case pop(state) do
- :error ->
- new_state = push(insert_many_aux(state, rest, callback), item)
- callback.(item)
- new_state
- {:ok, front, state_rest} ->
- case state.cmp.(item, front) do
- :after ->
- new_state = push(insert_many_aux(state, rest, callback), item)
- callback.(item)
- new_state
- :duplicate -> insert_many_aux(state, rest, callback)
- :before -> push(insert_many_aux(state_rest, [item | rest], callback), front)
- end
- end
- end
-
- @doc"""
- Insert a single item in the store.
-
- A callback function may be specified that is called on the item
- if it is sucessfully added, i.e. it wasn't present in the store before.
- """
- def insert(state, item, callback \\ (fn _ -> nil end)) do
- insert_many(state, [item], callback)
- end
-
- @doc"""
- Read some items from the state.
-
- The two parameters are optional:
- - qbegin : hash of the first item to read
- - qlimit : number of items to read
- """
- def read(state, qbegin \\ nil, qlimit \\ nil) do
- begin = qbegin || state.top
- limit = qlimit || 20
- get_items_list(state, begin, limit)
- end
-
- @doc"""
- Get the hash of the last item
- """
- def top(state) do
- state.top
- end
-
- @doc"""
- Get the hash of the root item
- """
- def root(state) do
- state.root
- end
-
- @doc"""
- Check if the store holds a certain item
- """
- def has(state, hash) do
- Map.has_key?(state.store, hash)
- end
-
- defp get_items_list(state, begin, limit) do
- case limit do
- 0 -> {:ok, [], begin}
- _ ->
- case Map.fetch(state.store, begin) do
- {:ok, :root} ->
- {:ok, [], nil }
- {:ok, {item, next}} ->
- case get_items_list(state, next, limit - 1) do
- {:ok, rest, past} ->
- {:ok, [ item | rest ], past }
- {:error, reason} -> {:error, reason}
- end
- :error -> {:error, begin}
- end
- end
- end
-end
diff --git a/lib/data/merklesearchtree.ex b/lib/data/merklesearchtree.ex
deleted file mode 100644
index 941d31d..0000000
--- a/lib/data/merklesearchtree.ex
+++ /dev/null
@@ -1,387 +0,0 @@
-defmodule SData.MerkleSearchTree do
- @moduledoc"""
- A Merkle search tree.
-
- A node of the tree is
- {
- level,
- hash_of_node | nil,
- [
- { item_low_bound, hash_of_node | nil },
- { item_low_bound, hash_of_node | nil },
- ...
- }
- }
- """
-
- alias SData.PageStore, as: Store
-
- @doc"""
- Create a new Merkle search tree.
-
- This structure can be used as a set with only true keys,
- or as a map if a merge function is given.
-
- `cmp` is a compare function for keys as defined in module `SData`.
-
- `merge` is a function for merging two items that have the same key.
- """
- defstruct root: nil,
- store: SData.LocalStore.new,
- cmp: &SData.cmp_term/2,
- merge: &SData.merge_true/2
-
-
- defmodule Page do
- defstruct [:level, :low, :list]
- end
-
- defimpl SData.Page, for: Page do
- def refs(page) do
- refs = for {_, _, h} <- page.list, h != nil, do: h
- if page.low != nil do
- [ page.low | refs ]
- else
- refs
- end
- end
- end
-
-
- @doc"""
- Insert an item into the search tree.
- """
- def insert(state, key, value \\ true) do
- level = calc_level(key)
- {hash, store} = insert_at(state, state.store, state.root, key, level, value)
- %{ state | root: hash, store: store }
- end
-
- defp insert_at(s, store, root, key, level, value) do
- {new_page, store} = if root == nil do
- { %Page{ level: level, low: nil, list: [ { key, value, nil } ] }, store }
- else
- %Page{ level: plevel, low: low, list: lst } = Store.get(store, root)
- [ { k0, _, _} | _ ] = lst
- cond do
- plevel < level ->
- {low, high, store} = split(s, store, root, key)
- { %Page{ level: level, low: low, list: [ { key, value, high } ] }, store }
- plevel == level ->
- store = Store.free(store, root)
- case s.cmp.(key, k0) do
- :before ->
- {low2a, low2b, store} = split(s, store, low, key)
- { %Page{ level: level, low: low2a, list: [ { key, value, low2b } | lst] }, store }
- _ ->
- {new_lst, store} = aux_insert_after_first(s, store, lst, key, value)
- { %Page{ level: plevel, low: low, list: new_lst }, store }
- end
- plevel > level ->
- store = Store.free(store, root)
- case s.cmp.(key, k0) do
- :before ->
- {new_low, store} = insert_at(s, store, low, key, level, value)
- { %Page{ level: plevel, low: new_low, list: lst }, store }
- :after ->
- {new_lst, store} = aux_insert_sub_after_first(s, store, lst, key, level, value)
- { %Page{ level: plevel, low: low, list: new_lst }, store }
- end
- end
- end
- Store.put(store, new_page) # returns {hash, store}
- end
-
- defp split(s, store, hash, key) do
- if hash == nil do
- {nil, nil, store}
- else
- %Page{ level: level, low: low, list: lst } = Store.get(store, hash) || Store.get(s.store, hash)
- store = Store.free(store, hash)
- [ { k0, _, _} | _ ] = lst
- case s.cmp.(key, k0) do
- :before ->
- {lowlow, lowhi, store} = split(s, store, low, key)
- newp2 = %Page{ level: level, low: lowhi, list: lst }
- {newp2h, store} = Store.put(store, newp2)
- {lowlow, newp2h, store}
- :after ->
- {lst1, p2, store} = split_aux(s, store, lst, key, level)
- newp1 = %Page{ level: level, low: low, list: lst1}
- {newp1h, store} = Store.put(store, newp1)
- {newp1h, p2, store}
- end
- end
- end
-
- defp split_aux(s, store, lst, key, level) do
- case lst do
- [ {k1, v1, r1} ] ->
- if s.cmp.(k1, key) == :duplicate do
- raise "Bad logic"
- end
- {r1l, r1h, store} = split(s, store, r1, key)
- { [{k1, v1, r1l}], r1h, store }
- [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] ->
- case s.cmp.(key, k2) do
- :before ->
- {r1l, r1h, store} = split(s, store, r1, key)
- p2 = %Page{ level: level, low: r1h, list: [ {k2, v2, r2} | rst ] }
- {p2h, store} = Store.put(store, p2)
- { [{k1, v1, r1l}], p2h, store }
- :after ->
- {rst2, hi, store} = split_aux(s, store, [rst1 | rst], key, level)
- { [ fst | rst2 ], hi, store }
- :duplicate ->
- raise "Bad logic"
- end
- end
- end
-
- defp aux_insert_after_first(s, store, lst, key, value) do
- case lst do
- [ {k1, v1, r1} ] ->
- case s.cmp.(k1, key) do
- :duplicate ->
- { [ {k1, s.merge.(v1, value), r1} ], store }
- :before ->
- {r1a, r1b, new_store} = split(s, store, r1, key)
- { [ {k1, v1, r1a}, {key, value, r1b} ], new_store }
- end
- [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] ->
- case s.cmp.(k1, key) do
- :duplicate ->
- { [ {k1, s.merge.(v1, value), r1}, rst1 | rst ], store }
- :before ->
- case s.cmp.(k2, key) do
- :after ->
- {r1a, r1b, new_store} = split(s, store, r1, key)
- { [ {k1, v1, r1a}, {key, value, r1b}, {k2, v2, r2} | rst ], new_store }
- _ ->
- {rst2, new_store} = aux_insert_after_first(s, store, [rst1 | rst], key, value)
- { [ fst | rst2 ], new_store }
- end
- end
- end
- end
-
- defp aux_insert_sub_after_first(s, store, lst, key, level, value) do
- case lst do
- [ {k1, v1, r1} ] ->
- if s.cmp.(k1, key) == :duplicate do
- raise "Bad logic"
- end
- {r1new, store_new} = insert_at(s, store, r1, key, level, value)
- { [{k1, v1, r1new}], store_new }
- [ {k1, v1, r1} = fst, {k2, _, _} = rst1 | rst ] ->
- if s.cmp.(k1, key) == :duplicate do
- raise "Bad logic"
- end
- case s.cmp.(key, k2) do
- :before ->
- {r1new, store_new} = insert_at(s, store, r1, key, level, value)
- { [{k1, v1, r1new}, rst1 | rst], store_new }
- _ ->
- {rst2, new_store} = aux_insert_sub_after_first(s, store, [rst1 |rst], key, level, value)
- { [ fst | rst2 ], new_store }
- end
- end
- end
-
- @doc"""
- Merge values from another MST in this MST.
-
- The merge is not symmetrical in the sense that:
- - new pages are added in the store of the first argument
- - the callback is called for all items found in the second argument and not the first
- """
- def merge(to, from, callback \\ fn _, _ -> nil end) do
- { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback)
- %{ to | store: store, root: root }
- end
-
- defp merge_aux(s1, s2, store, r1, r2, callback) do
- case {r1, r2} do
- _ when r1 == r2 -> { store, r1 }
- {_, nil} ->
- { store, r1 }
- {nil, _} ->
- store = Store.copy(store, s2.store, r2)
- rec_callback(store, r2, callback)
- { store, r2 }
- _ ->
- %Page{ level: level1, low: low1, list: lst1 } = Store.get(store, r1)
- %Page{ level: level2, low: low2, list: lst2 } = Store.get(store, r2) || Store.get(s2.store, r2)
- { level, low1, lst1, low2, lst2 } = cond do
- level1 == level2 -> {level1, low1, lst1, low2, lst2}
- level1 > level2 -> {level1, low1, lst1, r2, []}
- level2 > level1 -> {level2, r1, [], low2, lst2}
- end
- { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback)
- page = %Page{ level: level, low: low, list: lst }
- {hash, store} = Store.put(store, page)
- {store, hash}
- end
- end
-
- defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) do
- case {lst1, lst2} do
- { [], [] } ->
- {store, hash} = merge_aux(s1, s2, store, low1, low2, callback)
- {store, hash, []}
- { [], [ {k, v, r} | rst2 ] } ->
- {low1l, low1h, store} = split(s1, store, low1, k)
- {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback)
- callback.(k, v)
- {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2, callback)
- {store, newlow, [ {k, v, newr} | newrst ]}
- { [ {k, v, r} | rst1 ], [] } ->
- {low2l, low2h, store} = split(s2, store, low2, k)
- {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback)
- {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [], callback)
- {store, newlow, [ {k, v, newr} | newrst ]}
- { [ {k1, v1, r1} | rst1 ], [ {k2, v2, r2} | rst2 ] } ->
- case s1.cmp.(k1, k2) do
- :before ->
- {low2l, low2h, store} = split(s2, store, low2, k1)
- {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback)
- {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2, callback)
- {store, newlow, [ {k1, v1, newr} | newrst ]}
- :after ->
- {low1l, low1h, store} = split(s1, store, low1, k2)
- {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback)
- callback.(k2, v2)
- {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2, callback)
- {store, newlow, [ {k2, v2, newr} | newrst ]}
- :duplicate ->
- {store, newlow} = merge_aux(s1, s2, store, low1, low2, callback)
- newv = s1.merge.(v1, v2) ## TODO: callback here ??
- {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2, callback)
- {store, newlow, [ {k1, newv, newr} | newrst ]}
- end
- end
- end
-
- defp rec_callback(store, root, callback) do
- case root do
- nil -> nil
- _ ->
- %Page{ level: _, low: low, list: lst } = Store.get(store, root)
- rec_callback(store, low, callback)
- for {k, v, rst} <- lst do
- callback.(k, v)
- rec_callback(store, rst, callback)
- end
- end
- end
-
- @doc"""
- Get value for a specific key in search tree.
- """
- def get(state, key) do
- get(state, state.root, key)
- end
-
- defp get(s, root, key) do
- case root do
- nil -> nil
- _ ->
- %Page{ level: _, low: low, list: lst } = Store.get(s.store, root)
- get_aux(s, low, lst, key)
- end
- end
-
- defp get_aux(s, low, lst, key) do
- case lst do
- [] ->
- get(s, low, key)
- [ {k, v, low2} | rst ] ->
- case s.cmp.(key, k) do
- :duplicate -> v
- :before ->
- get(s, low, key)
- :after ->
- get_aux(s, low2, rst, key)
- end
- end
- end
-
- @doc"""
- Get the last n items of the tree, or the last n items
- strictly before given upper bound if non nil
- """
- def last(state, top_bound, num) do
- last(state, state.root, top_bound, num)
- end
-
- defp last(s, root, top_bound, num) do
- case root do
- nil -> []
- _ ->
- %Page{ level: _, low: low, list: lst } = Store.get(s.store, root)
- last_aux(s, low, lst, top_bound, num)
- end
- end
-
- defp last_aux(s, low, lst, top_bound, num) do
- case lst do
- [] ->
- last(s, low, top_bound, num)
- [ {k, v, low2} | rst ] ->
- if top_bound == nil or s.cmp.(top_bound, k) == :after do
- items = last_aux(s, low2, rst, top_bound, num)
- items = if Enum.count(items) < num do
- [ {k, v} | items ]
- else
- items
- end
- cnt = Enum.count items
- if cnt < num do
- last(s, low, top_bound, num - cnt) ++ items
- else
- items
- end
- else
- last(s, low, top_bound, num)
- end
- end
- end
-
-
- @doc"""
- Dump Merkle search tree structure.
- """
- def dump(state) do
- dump(state.store, state.root, "")
- end
-
- defp dump(store, root, lvl) do
- case root do
- nil ->
- IO.puts(lvl <> "nil")
- _ ->
- %Page{ level: level, low: low, list: lst} = Store.get(store, root)
- IO.puts(lvl <> "#{root|>Base.encode16} (#{level})")
- dump(store, low, lvl <> " ")
- for {k, v, r} <- lst do
- IO.puts(lvl<>"- #{inspect k} => #{inspect v}")
- dump(store, r, lvl <> " ")
- end
- end
- end
-
- defp calc_level(key) do
- key
- |> SData.term_hash
- |> Base.encode16
- |> String.to_charlist
- |> count_leading_zeroes
- end
-
- defp count_leading_zeroes('0' ++ rest) do
- 1 + count_leading_zeroes(rest)
- end
- defp count_leading_zeroes(_) do
- 0
- end
-end
diff --git a/lib/data/store.ex b/lib/data/store.ex
deleted file mode 100644
index ca12cd0..0000000
--- a/lib/data/store.ex
+++ /dev/null
@@ -1,91 +0,0 @@
-defprotocol SData.Page do
- @moduledoc"""
- Protocol to be implemented by objects that are used as data pages
- in a pagestore and that may reference other data pages by their hash.
- """
-
- @fallback_to_any true
-
- @doc"""
- Get hashes of all pages referenced by this page.
- """
- def refs(page)
-end
-
-defimpl SData.Page, for: Any do
- def refs(_page), do: []
-end
-
-
-defprotocol SData.PageStore do
- @moduledoc"""
- Protocol to be implemented for page stores to allow their
- manipulation.
-
- This protocol may also be implemented by store proxies that track
- operations and implement different synchronization or caching mechanisms.
- """
-
- @doc"""
- Put a page. Argument is the content of the page, returns the
- hash that the store has associated to it.
-
- Returns {hash, store}
- """
- def put(store, page)
-
- @doc"""
- Get a page referenced by its hash.
-
- Returns page
- """
- def get(store, hash)
-
- @doc"""
- Copy to the store a page and all its references from the other store.
- In the case of pages on the network in a distributed store, this may
- be lazy.
-
- Returns store
- """
- def copy(store, other_store, hash)
-
- @doc"""
- Free a page referenced by its hash, marking it as no longer needed.
-
- Returns store
- """
- def free(store, hash)
-end
-
-
-defmodule SData.LocalStore do
- defstruct [:pages]
-
- def new() do
- %SData.LocalStore{ pages: %{} }
- end
-end
-
-defimpl SData.PageStore, for: SData.LocalStore do
- def put(store, page) do
- hash = SData.term_hash page
- store = %{ store | pages: Map.put(store.pages, hash, page) }
- { hash, store }
- end
-
- def get(store, hash) do
- store.pages[hash]
- end
-
- def copy(store, other_store, hash) do
- page = SData.PageStore.get(other_store, hash)
- refs = SData.Page.refs(page)
- store = Enum.reduce(refs, store, fn x, acc -> copy(acc, other_store, x) end)
- %{ store | pages: Map.put(store.pages, hash, page) }
- end
-
- def free(store, hash) do
- %{ store | pages: Map.delete(store.pages, hash) }
- end
-end
diff --git a/lib/identity.ex b/lib/identity.ex
deleted file mode 100644
index f1899df..0000000
--- a/lib/identity.ex
+++ /dev/null
@@ -1,46 +0,0 @@
-defmodule Shard.Identity do
- use Agent
- require Salty.Sign.Ed25519, as: Sign
- require Logger
-
- def start_link(_) do
- Agent.start_link(__MODULE__, :init, [], name: __MODULE__)
- end
-
- def init() do
- Logger.info "Generating keypair..."
- {pk, sk} = gen_keypair(Application.get_env(:shard, :peer_id_suffix))
- nick_suffix = pk
- |> binary_part(0, 3)
- |> Base.encode16
- |> String.downcase
- %{
- keypair: {pk, sk},
- nickname: "Anon" <> nick_suffix,
- }
- end
-
- defp gen_keypair(suffix, n \\ 0) do
- {:ok, pk, sk} = Sign.keypair
- if rem(n, 10000) == 0 do
- Logger.info "#{n}... expected #{:math.pow(256, byte_size(suffix))}"
- end
- if :binary.longest_common_suffix([pk, suffix]) == byte_size(suffix) do
- {pk, sk}
- else
- gen_keypair(suffix, n+1)
- end
- end
-
- def get_keypair() do
- Agent.get(__MODULE__, &(&1.keypair))
- end
-
- def get_nickname() do
- Agent.get(__MODULE__, &(&1.nickname))
- end
-
- def set_nickname(newnick) do
- Agent.update(__MODULE__, &(%{&1 | nickname: newnick}))
- end
-end
diff --git a/lib/manager.ex b/lib/manager.ex
deleted file mode 100644
index 82984d6..0000000
--- a/lib/manager.ex
+++ /dev/null
@@ -1,243 +0,0 @@
-defmodule Shard.Manager do
- @moduledoc"""
- Maintains several important tables :
-
- - :peer_db
-
- List of
- { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }
-
- - :shard_db
-
- List of
- { id, manifest, pid | nil }
-
- - :shard_procs
-
- List of
- { {id, path}, pid }
-
- - :shard_peer_db
-
- Mult-list of
- { shard_id, peer_id }
-
-
- And an internal table :
-
- - :outbox
-
- Multi-list of
- { peer_id, message }
-
- """
-
- use GenServer
-
- require Logger
-
- def start_link(my_port) do
- GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
- end
-
- def init(my_port) do
- :ets.new(:peer_db, [:set, :protected, :named_table])
- :ets.new(:shard_db, [:set, :protected, :named_table])
- :ets.new(:shard_procs, [:set, :protected, :named_table])
- :ets.new(:shard_peer_db, [:bag, :protected, :named_table])
- outbox = :ets.new(:outbox, [:bag, :private])
-
- {:ok, %{my_port: my_port, outbox: outbox} }
- end
-
- def handle_call({:register, shard_id, manifest, pid}, _from, state) do
- will_live = case :ets.lookup(:shard_db, shard_id) do
- [{ ^shard_id, _, pid }] -> not Process.alive?(pid)
- _ -> true
- end
- reply = if will_live do
- Process.monitor(pid)
- :ets.insert(:shard_db, {shard_id, manifest, pid})
- :ok
- else
- :redundant
- end
- {:reply, reply, state}
- end
-
- def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
- :ets.insert(:shard_procs, { {shard_id, path}, pid })
- Process.monitor(pid)
- {:noreply, state}
- end
-
- def handle_cast({:interested, peer_id, shards}, state) do
- for shard_id <- shards do
- case :ets.lookup(:shard_db, shard_id) do
- [{ ^shard_id, _, pid }] ->
- :ets.insert(:shard_peer_db, {shard_id, peer_id})
- GenServer.cast(pid, {:interested, peer_id})
- [] -> nil
- end
- end
- {:noreply, state}
- end
-
- def handle_cast({:not_interested, peer_id, shard_id}, state) do
- :ets.match_delete(:shard_peer_db, {shard_id, peer_id})
- {:noreply, state}
- end
-
- def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do
- :ets.insert(:shard_peer_db, {shard_id, peer_id})
- {:noreply, state}
- end
-
- def handle_cast({:peer_up, pk, pid, ip, port}, state) do
- for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do
- if pk2 != pk do
- # obsolete peer information
- :ets.delete(:peer_db, pk2)
- :ets.match_delete(:shard_peer_db, {:_, pk2})
- end
- end
- :ets.insert(:peer_db, {pk, pid, ip, port})
-
- # Send interested message for all our shards
- id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id)
- GenServer.cast(pid, {:send_msg, {:interested, id_list}})
-
- # Send queued messages
- for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
- GenServer.cast(pid, {:send_msg, msg})
- end
- :ets.delete(state.outbox, pk)
-
- {:noreply, state}
- end
-
- def handle_cast({:peer_down, pk, ip, port}, state) do
- :ets.insert(:peer_db, {pk, nil, ip, port})
- {:noreply, state}
- end
-
- def handle_cast({:connect_and_send, peer_id, msg}, state) do
- case :ets.lookup(:peer_db, peer_id) do
- [{^peer_id, nil, ip, port}] ->
- add_peer(ip, port, state)
- currtime = System.os_time :second
- :ets.insert(state.outbox, {peer_id, msg, currtime})
- outbox_cleanup = [{{:_, :_, :'$1'},
- [{:<, :'$1', currtime - 60}],
- [:'$1']}]
- :ets.select_delete(state.outbox, outbox_cleanup)
- _ ->
- Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database"
- end
- {:noreply, state}
- end
-
- def handle_cast({:try_connect, pk_list}, state) do
- for pk <- pk_list do
- case :ets.lookup(:peer_db, pk) do
- [{^pk, nil, ip, port}] ->
- add_peer(ip, port, state)
- _ -> nil
- end
- end
- {:noreply, state}
- end
-
- def handle_cast({:add_peer, ip, port}, state) do
- add_peer(ip, port, state)
- {:noreply, state}
- end
-
- def handle_info({:DOWN, _, :process, pid, _}, state) do
- :ets.match_delete(:shard_procs, {:_, pid})
- {:noreply, state}
- end
-
- defp add_peer(ip, port, state) do
- spawn fn ->
- case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
- {:ok, client} ->
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
- :ok = :gen_tcp.controlling_process(client, pid)
- _ ->
- Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
- end
- end
- end
-
-
- # ================
- # PUBLIC INTERFACE
- # ================
-
-
- @doc"""
- Connect to a peer specified by ip address and port
- """
- def add_peer(ip, port) do
- GenServer.cast(__MODULE__, {:add_peer, ip, port})
- end
-
- @doc"""
- Send message to a peer specified by peer id
- """
- def send(peer_id, msg) do
- case :ets.lookup(:peer_db, peer_id) do
- [{ ^peer_id, pid, _, _}] when pid != nil->
- GenServer.cast(pid, {:send_msg, msg})
- _ ->
- GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
- end
- end
-
- @doc"""
- Dispatch incoming message to correct shard process
- """
- def dispatch(peer_id, {shard_id, path, msg}) do
- case :ets.lookup(:shard_db, shard_id) do
- [] ->
- __MODULE__.send(peer_id, {:not_interested, shard_id})
- [_] ->
- case :ets.match(:shard_peer_db, {shard_id, peer_id}) do
- [] ->
- GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id})
- _ -> nil
- end
- case :ets.lookup(:shard_procs, {shard_id, path}) do
- [{ {^shard_id, ^path}, pid }] ->
- GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg})
- [] ->
- Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}")
- end
- end
- end
-
- def dispatch(peer_id, {:interested, shards}) do
- GenServer.cast(__MODULE__, {:interested, peer_id, shards})
- end
-
- def dispatch(peer_id, {:not_interested, shard}) do
- GenServer.cast(__MODULE__, {:not_interested, peer_id, shard})
- end
-
- @doc"""
- Register a process as the main process for a shard.
-
- Returns either :ok or :redundant, in which case the process must exit.
- """
- def register(shard_id, manifest, pid) do
- GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
- end
-
- @doc"""
- Register a process as the handler for shard packets for a given path.
- """
- def dispatch_to(shard_id, path, pid) do
- GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
- end
-end
diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex
deleted file mode 100644
index 44669bb..0000000
--- a/lib/net/tcpconn.ex
+++ /dev/null
@@ -1,106 +0,0 @@
-defmodule SNet.TCPConn do
- use GenServer, restart: :temporary
- require Salty.Box.Curve25519xchacha20poly1305, as: Box
- require Salty.Sign.Ed25519, as: Sign
- require Logger
-
- def start_link(state) do
- GenServer.start_link(__MODULE__, state)
- end
-
- def init(state) do
- GenServer.cast(self(), :handshake)
- {:ok, state}
- end
-
- def handle_call(:get_host_str, _from, state) do
- {:reply, "#{state.his_pkey|>Base.encode16|>String.downcase}@#{to_string(:inet_parse.ntoa(state.addr))}:#{state.port}", state}
- end
-
- def handle_cast(:handshake, state) do
- socket = state.socket
-
- {srv_pkey, srv_skey} = Shard.Identity.get_keypair
- {:ok, sess_pkey, sess_skey} = Box.keypair
- {:ok, challenge} = Salty.Random.buf 32
-
- # Exchange public keys and challenge
- hello = {srv_pkey, sess_pkey, challenge, state.my_port}
- :gen_tcp.send(socket, :erlang.term_to_binary hello)
- {:ok, pkt} = :gen_tcp.recv(socket, 0)
- {cli_pkey, cli_sess_pkey, cli_challenge, his_port} = :erlang.binary_to_term(pkt, [:safe])
-
- # Do challenge and check their challenge
- {:ok, cli_challenge_sign} = Sign.sign_detached(cli_challenge, srv_skey)
- pkt = encode_pkt(cli_challenge_sign, cli_sess_pkey, sess_skey)
- :gen_tcp.send(socket, pkt)
-
- {:ok, pkt} = :gen_tcp.recv(socket, 0)
- challenge_sign = decode_pkt(pkt, cli_sess_pkey, sess_skey)
- :ok = Sign.verify_detached(challenge_sign, challenge, cli_pkey)
-
- expected_suffix = Application.get_env(:shard, :peer_id_suffix)
- len = byte_size(expected_suffix)
- ^len = :binary.longest_common_suffix([cli_pkey, expected_suffix])
-
- # Connected
- :inet.setopts(socket, [active: true])
-
- {:ok, {addr, port}} = :inet.peername socket
- state =%{ socket: socket,
- my_pkey: srv_pkey,
- my_skey: srv_skey,
- his_pkey: cli_pkey,
- conn_my_pkey: sess_pkey,
- conn_my_skey: sess_skey,
- conn_his_pkey: cli_sess_pkey,
- addr: addr,
- port: port,
- his_port: his_port
- }
- GenServer.cast(Shard.Manager, {:peer_up, cli_pkey, self(), addr, his_port})
- Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
-
- {:noreply, state}
- end
-
- def handle_cast({:send_msg, msg}, state) do
- msgbin = :erlang.term_to_binary msg
- enc = encode_pkt(msgbin, state.conn_his_pkey, state.conn_my_skey)
- :gen_tcp.send(state.socket, enc)
- {:noreply, state}
- end
-
- defp encode_pkt(pkt, pk, sk) do
- {:ok, n} = Salty.Random.buf Box.noncebytes
- {:ok, msg} = Box.easy(pkt, n, pk, sk)
- n <> msg
- end
-
- defp decode_pkt(pkt, pk, sk) do
- n = binary_part(pkt, 0, Box.noncebytes)
- enc = binary_part(pkt, Box.noncebytes, (byte_size pkt) - Box.noncebytes)
- {:ok, msg} = Box.open_easy(enc, n, pk, sk)
- msg
- end
-
- def handle_info({:tcp, _socket, raw_data}, state) do
- msg = decode_pkt(raw_data, state.conn_his_pkey, state.conn_my_skey)
- msg_data = :erlang.binary_to_term(msg, [:safe])
- Shard.Manager.dispatch(state.his_pkey, msg_data)
- {:noreply, state}
- end
-
- def handle_info({:tcp_closed, _socket}, state) do
- Logger.info "Disconnected: #{print_id state} at #{inspect state.addr}:#{state.port}"
- GenServer.cast(Shard.Manager, {:peer_down, state.his_pkey, state.addr, state.his_port})
- exit(:normal)
- end
-
- defp print_id(state) do
- state.his_pkey
- |> binary_part(0, 8)
- |> Base.encode16
- |> String.downcase
- end
-end
diff --git a/lib/net/tcpserver.ex b/lib/net/tcpserver.ex
deleted file mode 100644
index 46552a4..0000000
--- a/lib/net/tcpserver.ex
+++ /dev/null
@@ -1,26 +0,0 @@
-defmodule SNet.TCPServer do
- require Logger
- use Task, restart: :permanent
-
- def start_link(port) do
- Task.start_link(__MODULE__, :accept, [port])
- end
-
- @doc """
- Starts accepting connections on the given `port`.
- """
- def accept(port) do
- {:ok, socket} = :gen_tcp.listen(port,
- [:binary, packet: 2, active: false, reuseaddr: true])
- Logger.info "Accepting connections on port #{port}"
- loop_acceptor(socket, port)
- end
-
- defp loop_acceptor(socket, my_port) do
- {:ok, client} = :gen_tcp.accept(socket)
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
- :ok = :gen_tcp.controlling_process(client, pid)
- loop_acceptor(socket, my_port)
- end
-end
-