aboutsummaryrefslogtreecommitdiff
path: root/shard
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-09-11 15:39:09 +0200
committerAlex Auvolat <alex@adnab.me>2018-09-11 15:39:09 +0200
commita033c82a3c656a8f53feb60b5b149680771ac247 (patch)
treebc3cb9a6954aebcfd1a0c5f61d367e1083802c3e /shard
parente92969db3f0a2093da16eb7db18c9db49225a719 (diff)
downloadshard-a033c82a3c656a8f53feb60b5b149680771ac247.tar.gz
shard-a033c82a3c656a8f53feb60b5b149680771ac247.zip
Use DETS to store shard & peer list to disk
Diffstat (limited to 'shard')
-rw-r--r--shard/config/config.exs4
-rw-r--r--shard/lib/app/blockstore.ex4
-rw-r--r--shard/lib/app/chat.ex19
-rw-r--r--shard/lib/cli/cli.ex10
-rw-r--r--shard/lib/identity.ex41
-rw-r--r--shard/lib/manager.ex77
-rw-r--r--shard/lib/net/tcpconn.ex4
7 files changed, 113 insertions, 46 deletions
diff --git a/shard/config/config.exs b/shard/config/config.exs
index 5a08e12..f2a7f09 100644
--- a/shard/config/config.exs
+++ b/shard/config/config.exs
@@ -30,11 +30,11 @@ use Mix.Config
# On first run, the instance will try to generate a peer id that
# has this suffix. This is done by brute-force testing, therefore
# it is not recommended to use long suffixes.
-config :shard, peer_id_suffix: "S"
+config :shard, peer_id_suffix: "SH"
# Data directory
# ==============
-config :shard, data_directory: Path.join [System.user_home, "shard", "data"]
+config :shard, data_path: Path.join [System.user_home, "shard", "data"]
# It is also possible to import configuration files, relative to this
diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/blockstore.ex
index 8e4fddc..5e93135 100644
--- a/shard/lib/app/blockstore.ex
+++ b/shard/lib/app/blockstore.ex
@@ -109,10 +109,10 @@ defmodule SApp.BlockStore do
end
def ask_random_peers(state, key) do
- peers = :ets.lookup(:shard_peer_db, state.shard_id)
+ peers = Shard.Manager.get_shard_peers(state.shard_id)
|> Enum.shuffle
|> Enum.take(3)
- for {_, peer} <- peers do
+ for peer <- peers do
Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
end
end
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index 051fab6..471d8f7 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -10,7 +10,6 @@ defmodule SApp.Chat do
Future improvements:
- message signing
- storage of the chatroom messages to disk
- - storage of the known peers that have this channel to disk
- use a DHT to find peers that are interested in this channel
- epidemic broadcast (carefull not to be too costly,
maybe by limiting the number of peers we talk to)
@@ -22,6 +21,18 @@ defmodule SApp.Chat do
require Logger
alias SData.MerkleSearchTree, as: MST
+
+ defmodule Manifest do
+ defstruct [:channel]
+ end
+
+ defimpl Shard.Manifest, for: Manifest do
+ def start(m) do
+ SApp.Chat.start_link(m.channel)
+ end
+ end
+
+
@doc """
Start a process that connects to a given channel
"""
@@ -33,7 +44,7 @@ defmodule SApp.Chat do
Initialize channel process.
"""
def init(channel) do
- manifest = {:chat, channel}
+ manifest = %Manifest{channel: channel}
id = SData.term_hash manifest
case Shard.Manager.register(id, manifest, self()) do
@@ -75,7 +86,7 @@ defmodule SApp.Chat do
send data for this channel if they have some.
"""
def handle_cast(:init_pull, state) do
- for {_, pid, _, _} <- :ets.tab2list(:peer_db) do
+ for {_, pid, _, _} <- Shard.Manager.list_peers do
GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
end
{:noreply, state}
@@ -101,7 +112,7 @@ defmodule SApp.Chat do
end
notif = {:append, prev_root, msgitem, mst.root}
- for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do
+ for peer_id <- Shard.Manager.get_shard_peers(state.id) do
Shard.Manager.send(peer_id, {state.id, nil, notif})
end
diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex
index 2fbf8c2..c3afe8f 100644
--- a/shard/lib/cli/cli.ex
+++ b/shard/lib/cli/cli.ex
@@ -14,7 +14,7 @@ defmodule SCLI do
prompt = case pid do
nil -> "(no channel) #{nick}: "
_ ->
- {:chat, chan} = GenServer.call(pid, :manifest)
+ %SApp.Chat.Manifest{channel: chan} = GenServer.call(pid, :manifest)
"##{chan} #{nick}: "
end
@@ -56,8 +56,8 @@ defmodule SCLI do
defp handle_command(pid, ["list"]) do
IO.puts "List of known channels:"
- for {_chid, manifest, _chpid} <- :ets.tab2list(:shard_db) do
- {:chat, chan} = manifest
+ for {_chid, manifest, _chpid} <- Shard.Manager.list_shards do
+ %SApp.Chat.Manifest{channel: chan} = manifest
IO.puts "##{chan}"
end
pid
@@ -72,8 +72,8 @@ defmodule SCLI do
end
defp handle_command(_pid, ["join", qchan]) do
- list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db),
- {:chat, chan} = manifest,
+ list = for {_chid, manifest, chpid} <- Shard.Manager.list_shards,
+ %SApp.Chat.Manifest{channel: chan} = manifest,
do: {chan, chpid}
case List.keyfind(list, qchan, 0) do
nil ->
diff --git a/shard/lib/identity.ex b/shard/lib/identity.ex
index f1899df..01b3c9e 100644
--- a/shard/lib/identity.ex
+++ b/shard/lib/identity.ex
@@ -3,21 +3,36 @@ defmodule Shard.Identity do
require Salty.Sign.Ed25519, as: Sign
require Logger
+ @identity_db [Application.get_env(:shard, :data_path), "identity_db"] |> Path.join |> String.to_atom
+
def start_link(_) do
Agent.start_link(__MODULE__, :init, [], name: __MODULE__)
end
def init() do
- Logger.info "Generating keypair..."
- {pk, sk} = gen_keypair(Application.get_env(:shard, :peer_id_suffix))
- nick_suffix = pk
- |> binary_part(0, 3)
- |> Base.encode16
- |> String.downcase
- %{
- keypair: {pk, sk},
- nickname: "Anon" <> nick_suffix,
- }
+ :dets.start
+ {:ok, @identity_db} = :dets.open_file @identity_db, type: :set
+
+ case :dets.match @identity_db, :"$1" do
+ [] ->
+ Logger.info "Generating keypair..."
+ {pk, sk} = gen_keypair(Application.get_env(:shard, :peer_id_suffix))
+ nick_suffix = pk
+ |> binary_part(0, 3)
+ |> Base.encode16
+ |> String.downcase
+ nick = "Anon" <> nick_suffix
+ :dets.insert @identity_db, {pk, sk, nick}
+ %{
+ keypair: {pk, sk},
+ nickname: nick
+ }
+ [[{pk, sk, nick}] | _] ->
+ %{
+ keypair: {pk, sk},
+ nickname: nick
+ }
+ end
end
defp gen_keypair(suffix, n \\ 0) do
@@ -41,6 +56,10 @@ defmodule Shard.Identity do
end
def set_nickname(newnick) do
- Agent.update(__MODULE__, &(%{&1 | nickname: newnick}))
+ Agent.update(__MODULE__, fn state ->
+ {pk, sk} = state.keypair
+ :dets.insert @identity_db, {pk, sk, newnick}
+ %{state | nickname: newnick}
+ end)
end
end
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index 82984d6..3f2bddb 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -1,3 +1,8 @@
+defprotocol Shard.Manifest do
+ @doc "Start the corresponding Shard process"
+ def start(manifest)
+end
+
defmodule Shard.Manager do
@moduledoc"""
Maintains several important tables :
@@ -5,7 +10,7 @@ defmodule Shard.Manager do
- :peer_db
List of
- { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }
+ { id, pid | nil, ip, port }
- :shard_db
@@ -36,28 +41,44 @@ defmodule Shard.Manager do
require Logger
+ @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
+ @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
+ @shard_peer_db [Application.get_env(:shard, :data_path), "shard_peer_db"] |> Path.join |> String.to_atom
+
def start_link(my_port) do
GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
end
def init(my_port) do
- :ets.new(:peer_db, [:set, :protected, :named_table])
- :ets.new(:shard_db, [:set, :protected, :named_table])
+ :dets.open_file(@peer_db, [type: :set])
+ for [{id, _pid, ip, port}] <- :dets.match @peer_db, :"$1" do
+ :dets.insert @peer_db, {id, nil, ip, port}
+ # connect blindly to everyone
+ add_peer(ip, port)
+ end
+
+ :dets.open_file(@shard_db, [type: :set])
+ for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do
+ :dets.insert @shard_db, {id, manifest, nil}
+ spawn fn -> Shard.Manifest.start manifest end
+ end
+
+ :dets.open_file(@shard_peer_db, [type: :bag])
+
:ets.new(:shard_procs, [:set, :protected, :named_table])
- :ets.new(:shard_peer_db, [:bag, :protected, :named_table])
outbox = :ets.new(:outbox, [:bag, :private])
{:ok, %{my_port: my_port, outbox: outbox} }
end
def handle_call({:register, shard_id, manifest, pid}, _from, state) do
- will_live = case :ets.lookup(:shard_db, shard_id) do
- [{ ^shard_id, _, pid }] -> not Process.alive?(pid)
+ will_live = case :dets.lookup(@shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] when pid != nil -> not Process.alive?(pid)
_ -> true
end
reply = if will_live do
Process.monitor(pid)
- :ets.insert(:shard_db, {shard_id, manifest, pid})
+ :dets.insert(@shard_db, {shard_id, manifest, pid})
:ok
else
:redundant
@@ -73,9 +94,9 @@ defmodule Shard.Manager do
def handle_cast({:interested, peer_id, shards}, state) do
for shard_id <- shards do
- case :ets.lookup(:shard_db, shard_id) do
+ case :dets.lookup(@shard_db, shard_id) do
[{ ^shard_id, _, pid }] ->
- :ets.insert(:shard_peer_db, {shard_id, peer_id})
+ :dets.insert(@shard_peer_db, {shard_id, peer_id})
GenServer.cast(pid, {:interested, peer_id})
[] -> nil
end
@@ -84,27 +105,27 @@ defmodule Shard.Manager do
end
def handle_cast({:not_interested, peer_id, shard_id}, state) do
- :ets.match_delete(:shard_peer_db, {shard_id, peer_id})
+ :dets.match_delete(@shard_peer_db, {shard_id, peer_id})
{:noreply, state}
end
def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do
- :ets.insert(:shard_peer_db, {shard_id, peer_id})
+ :dets.insert(@shard_peer_db, {shard_id, peer_id})
{:noreply, state}
end
def handle_cast({:peer_up, pk, pid, ip, port}, state) do
- for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do
+ for [pk2] <- :dets.match(@peer_db, {:'$1', :_, ip, port}) do
if pk2 != pk do
# obsolete peer information
- :ets.delete(:peer_db, pk2)
- :ets.match_delete(:shard_peer_db, {:_, pk2})
+ :dets.delete(@peer_db, pk2)
+ :dets.match_delete(@shard_peer_db, {:_, pk2})
end
end
- :ets.insert(:peer_db, {pk, pid, ip, port})
+ :dets.insert(@peer_db, {pk, pid, ip, port})
# Send interested message for all our shards
- id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id)
+ id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id)
GenServer.cast(pid, {:send_msg, {:interested, id_list}})
# Send queued messages
@@ -117,12 +138,12 @@ defmodule Shard.Manager do
end
def handle_cast({:peer_down, pk, ip, port}, state) do
- :ets.insert(:peer_db, {pk, nil, ip, port})
+ :dets.insert(@peer_db, {pk, nil, ip, port})
{:noreply, state}
end
def handle_cast({:connect_and_send, peer_id, msg}, state) do
- case :ets.lookup(:peer_db, peer_id) do
+ case :dets.lookup(@peer_db, peer_id) do
[{^peer_id, nil, ip, port}] ->
add_peer(ip, port, state)
currtime = System.os_time :second
@@ -139,7 +160,7 @@ defmodule Shard.Manager do
def handle_cast({:try_connect, pk_list}, state) do
for pk <- pk_list do
- case :ets.lookup(:peer_db, pk) do
+ case :dets.lookup(@peer_db, pk) do
[{^pk, nil, ip, port}] ->
add_peer(ip, port, state)
_ -> nil
@@ -187,7 +208,7 @@ defmodule Shard.Manager do
Send message to a peer specified by peer id
"""
def send(peer_id, msg) do
- case :ets.lookup(:peer_db, peer_id) do
+ case :dets.lookup(@peer_db, peer_id) do
[{ ^peer_id, pid, _, _}] when pid != nil->
GenServer.cast(pid, {:send_msg, msg})
_ ->
@@ -199,11 +220,11 @@ defmodule Shard.Manager do
Dispatch incoming message to correct shard process
"""
def dispatch(peer_id, {shard_id, path, msg}) do
- case :ets.lookup(:shard_db, shard_id) do
+ case :dets.lookup(@shard_db, shard_id) do
[] ->
__MODULE__.send(peer_id, {:not_interested, shard_id})
[_] ->
- case :ets.match(:shard_peer_db, {shard_id, peer_id}) do
+ case :dets.match(@shard_peer_db, {shard_id, peer_id}) do
[] ->
GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id})
_ -> nil
@@ -240,4 +261,16 @@ defmodule Shard.Manager do
def dispatch_to(shard_id, path, pid) do
GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
end
+
+ def list_shards() do
+ for [x] <- :dets.match(@shard_db, :"$1"), do: x
+ end
+
+ def list_peers() do
+ for [x] <- :dets.match(@peer_db, :"$1"), do: x
+ end
+
+ def get_shard_peers(shard_id) do
+ for [x] <- :dets.match(@shard_peer_db, {shard_id, :"$1"}), do: x
+ end
end
diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex
index 44669bb..35f7ea5 100644
--- a/shard/lib/net/tcpconn.ex
+++ b/shard/lib/net/tcpconn.ex
@@ -43,6 +43,10 @@ defmodule SNet.TCPConn do
len = byte_size(expected_suffix)
^len = :binary.longest_common_suffix([cli_pkey, expected_suffix])
+ if srv_pkey == cli_pkey do
+ exit :normal
+ end
+
# Connected
:inet.setopts(socket, [active: true])