From a033c82a3c656a8f53feb60b5b149680771ac247 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 11 Sep 2018 15:39:09 +0200 Subject: Use DETS to store shard & peer list to disk --- shard/lib/app/blockstore.ex | 4 +-- shard/lib/app/chat.ex | 19 ++++++++--- shard/lib/cli/cli.ex | 10 +++--- shard/lib/identity.ex | 41 +++++++++++++++++------- shard/lib/manager.ex | 77 ++++++++++++++++++++++++++++++++------------- shard/lib/net/tcpconn.ex | 4 +++ 6 files changed, 111 insertions(+), 44 deletions(-) (limited to 'shard/lib') diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/blockstore.ex index 8e4fddc..5e93135 100644 --- a/shard/lib/app/blockstore.ex +++ b/shard/lib/app/blockstore.ex @@ -109,10 +109,10 @@ defmodule SApp.BlockStore do end def ask_random_peers(state, key) do - peers = :ets.lookup(:shard_peer_db, state.shard_id) + peers = Shard.Manager.get_shard_peers(state.shard_id) |> Enum.shuffle |> Enum.take(3) - for {_, peer} <- peers do + for peer <- peers do Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) end end diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 051fab6..471d8f7 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -10,7 +10,6 @@ defmodule SApp.Chat do Future improvements: - message signing - storage of the chatroom messages to disk - - storage of the known peers that have this channel to disk - use a DHT to find peers that are interested in this channel - epidemic broadcast (carefull not to be too costly, maybe by limiting the number of peers we talk to) @@ -22,6 +21,18 @@ defmodule SApp.Chat do require Logger alias SData.MerkleSearchTree, as: MST + + defmodule Manifest do + defstruct [:channel] + end + + defimpl Shard.Manifest, for: Manifest do + def start(m) do + SApp.Chat.start_link(m.channel) + end + end + + @doc """ Start a process that connects to a given channel """ @@ -33,7 +44,7 @@ defmodule SApp.Chat do Initialize channel process. """ def init(channel) do - manifest = {:chat, channel} + manifest = %Manifest{channel: channel} id = SData.term_hash manifest case Shard.Manager.register(id, manifest, self()) do @@ -75,7 +86,7 @@ defmodule SApp.Chat do send data for this channel if they have some. """ def handle_cast(:init_pull, state) do - for {_, pid, _, _} <- :ets.tab2list(:peer_db) do + for {_, pid, _, _} <- Shard.Manager.list_peers do GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) end {:noreply, state} @@ -101,7 +112,7 @@ defmodule SApp.Chat do end notif = {:append, prev_root, msgitem, mst.root} - for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do + for peer_id <- Shard.Manager.get_shard_peers(state.id) do Shard.Manager.send(peer_id, {state.id, nil, notif}) end diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex index 2fbf8c2..c3afe8f 100644 --- a/shard/lib/cli/cli.ex +++ b/shard/lib/cli/cli.ex @@ -14,7 +14,7 @@ defmodule SCLI do prompt = case pid do nil -> "(no channel) #{nick}: " _ -> - {:chat, chan} = GenServer.call(pid, :manifest) + %SApp.Chat.Manifest{channel: chan} = GenServer.call(pid, :manifest) "##{chan} #{nick}: " end @@ -56,8 +56,8 @@ defmodule SCLI do defp handle_command(pid, ["list"]) do IO.puts "List of known channels:" - for {_chid, manifest, _chpid} <- :ets.tab2list(:shard_db) do - {:chat, chan} = manifest + for {_chid, manifest, _chpid} <- Shard.Manager.list_shards do + %SApp.Chat.Manifest{channel: chan} = manifest IO.puts "##{chan}" end pid @@ -72,8 +72,8 @@ defmodule SCLI do end defp handle_command(_pid, ["join", qchan]) do - list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db), - {:chat, chan} = manifest, + list = for {_chid, manifest, chpid} <- Shard.Manager.list_shards, + %SApp.Chat.Manifest{channel: chan} = manifest, do: {chan, chpid} case List.keyfind(list, qchan, 0) do nil -> diff --git a/shard/lib/identity.ex b/shard/lib/identity.ex index f1899df..01b3c9e 100644 --- a/shard/lib/identity.ex +++ b/shard/lib/identity.ex @@ -3,21 +3,36 @@ defmodule Shard.Identity do require Salty.Sign.Ed25519, as: Sign require Logger + @identity_db [Application.get_env(:shard, :data_path), "identity_db"] |> Path.join |> String.to_atom + def start_link(_) do Agent.start_link(__MODULE__, :init, [], name: __MODULE__) end def init() do - Logger.info "Generating keypair..." - {pk, sk} = gen_keypair(Application.get_env(:shard, :peer_id_suffix)) - nick_suffix = pk - |> binary_part(0, 3) - |> Base.encode16 - |> String.downcase - %{ - keypair: {pk, sk}, - nickname: "Anon" <> nick_suffix, - } + :dets.start + {:ok, @identity_db} = :dets.open_file @identity_db, type: :set + + case :dets.match @identity_db, :"$1" do + [] -> + Logger.info "Generating keypair..." + {pk, sk} = gen_keypair(Application.get_env(:shard, :peer_id_suffix)) + nick_suffix = pk + |> binary_part(0, 3) + |> Base.encode16 + |> String.downcase + nick = "Anon" <> nick_suffix + :dets.insert @identity_db, {pk, sk, nick} + %{ + keypair: {pk, sk}, + nickname: nick + } + [[{pk, sk, nick}] | _] -> + %{ + keypair: {pk, sk}, + nickname: nick + } + end end defp gen_keypair(suffix, n \\ 0) do @@ -41,6 +56,10 @@ defmodule Shard.Identity do end def set_nickname(newnick) do - Agent.update(__MODULE__, &(%{&1 | nickname: newnick})) + Agent.update(__MODULE__, fn state -> + {pk, sk} = state.keypair + :dets.insert @identity_db, {pk, sk, newnick} + %{state | nickname: newnick} + end) end end diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index 82984d6..3f2bddb 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -1,3 +1,8 @@ +defprotocol Shard.Manifest do + @doc "Start the corresponding Shard process" + def start(manifest) +end + defmodule Shard.Manager do @moduledoc""" Maintains several important tables : @@ -5,7 +10,7 @@ defmodule Shard.Manager do - :peer_db List of - { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen } + { id, pid | nil, ip, port } - :shard_db @@ -36,28 +41,44 @@ defmodule Shard.Manager do require Logger + @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom + @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom + @shard_peer_db [Application.get_env(:shard, :data_path), "shard_peer_db"] |> Path.join |> String.to_atom + def start_link(my_port) do GenServer.start_link(__MODULE__, my_port, name: __MODULE__) end def init(my_port) do - :ets.new(:peer_db, [:set, :protected, :named_table]) - :ets.new(:shard_db, [:set, :protected, :named_table]) + :dets.open_file(@peer_db, [type: :set]) + for [{id, _pid, ip, port}] <- :dets.match @peer_db, :"$1" do + :dets.insert @peer_db, {id, nil, ip, port} + # connect blindly to everyone + add_peer(ip, port) + end + + :dets.open_file(@shard_db, [type: :set]) + for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do + :dets.insert @shard_db, {id, manifest, nil} + spawn fn -> Shard.Manifest.start manifest end + end + + :dets.open_file(@shard_peer_db, [type: :bag]) + :ets.new(:shard_procs, [:set, :protected, :named_table]) - :ets.new(:shard_peer_db, [:bag, :protected, :named_table]) outbox = :ets.new(:outbox, [:bag, :private]) {:ok, %{my_port: my_port, outbox: outbox} } end def handle_call({:register, shard_id, manifest, pid}, _from, state) do - will_live = case :ets.lookup(:shard_db, shard_id) do - [{ ^shard_id, _, pid }] -> not Process.alive?(pid) + will_live = case :dets.lookup(@shard_db, shard_id) do + [{ ^shard_id, _, pid }] when pid != nil -> not Process.alive?(pid) _ -> true end reply = if will_live do Process.monitor(pid) - :ets.insert(:shard_db, {shard_id, manifest, pid}) + :dets.insert(@shard_db, {shard_id, manifest, pid}) :ok else :redundant @@ -73,9 +94,9 @@ defmodule Shard.Manager do def handle_cast({:interested, peer_id, shards}, state) do for shard_id <- shards do - case :ets.lookup(:shard_db, shard_id) do + case :dets.lookup(@shard_db, shard_id) do [{ ^shard_id, _, pid }] -> - :ets.insert(:shard_peer_db, {shard_id, peer_id}) + :dets.insert(@shard_peer_db, {shard_id, peer_id}) GenServer.cast(pid, {:interested, peer_id}) [] -> nil end @@ -84,27 +105,27 @@ defmodule Shard.Manager do end def handle_cast({:not_interested, peer_id, shard_id}, state) do - :ets.match_delete(:shard_peer_db, {shard_id, peer_id}) + :dets.match_delete(@shard_peer_db, {shard_id, peer_id}) {:noreply, state} end def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do - :ets.insert(:shard_peer_db, {shard_id, peer_id}) + :dets.insert(@shard_peer_db, {shard_id, peer_id}) {:noreply, state} end def handle_cast({:peer_up, pk, pid, ip, port}, state) do - for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do + for [pk2] <- :dets.match(@peer_db, {:'$1', :_, ip, port}) do if pk2 != pk do # obsolete peer information - :ets.delete(:peer_db, pk2) - :ets.match_delete(:shard_peer_db, {:_, pk2}) + :dets.delete(@peer_db, pk2) + :dets.match_delete(@shard_peer_db, {:_, pk2}) end end - :ets.insert(:peer_db, {pk, pid, ip, port}) + :dets.insert(@peer_db, {pk, pid, ip, port}) # Send interested message for all our shards - id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id) + id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id) GenServer.cast(pid, {:send_msg, {:interested, id_list}}) # Send queued messages @@ -117,12 +138,12 @@ defmodule Shard.Manager do end def handle_cast({:peer_down, pk, ip, port}, state) do - :ets.insert(:peer_db, {pk, nil, ip, port}) + :dets.insert(@peer_db, {pk, nil, ip, port}) {:noreply, state} end def handle_cast({:connect_and_send, peer_id, msg}, state) do - case :ets.lookup(:peer_db, peer_id) do + case :dets.lookup(@peer_db, peer_id) do [{^peer_id, nil, ip, port}] -> add_peer(ip, port, state) currtime = System.os_time :second @@ -139,7 +160,7 @@ defmodule Shard.Manager do def handle_cast({:try_connect, pk_list}, state) do for pk <- pk_list do - case :ets.lookup(:peer_db, pk) do + case :dets.lookup(@peer_db, pk) do [{^pk, nil, ip, port}] -> add_peer(ip, port, state) _ -> nil @@ -187,7 +208,7 @@ defmodule Shard.Manager do Send message to a peer specified by peer id """ def send(peer_id, msg) do - case :ets.lookup(:peer_db, peer_id) do + case :dets.lookup(@peer_db, peer_id) do [{ ^peer_id, pid, _, _}] when pid != nil-> GenServer.cast(pid, {:send_msg, msg}) _ -> @@ -199,11 +220,11 @@ defmodule Shard.Manager do Dispatch incoming message to correct shard process """ def dispatch(peer_id, {shard_id, path, msg}) do - case :ets.lookup(:shard_db, shard_id) do + case :dets.lookup(@shard_db, shard_id) do [] -> __MODULE__.send(peer_id, {:not_interested, shard_id}) [_] -> - case :ets.match(:shard_peer_db, {shard_id, peer_id}) do + case :dets.match(@shard_peer_db, {shard_id, peer_id}) do [] -> GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id}) _ -> nil @@ -240,4 +261,16 @@ defmodule Shard.Manager do def dispatch_to(shard_id, path, pid) do GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) end + + def list_shards() do + for [x] <- :dets.match(@shard_db, :"$1"), do: x + end + + def list_peers() do + for [x] <- :dets.match(@peer_db, :"$1"), do: x + end + + def get_shard_peers(shard_id) do + for [x] <- :dets.match(@shard_peer_db, {shard_id, :"$1"}), do: x + end end diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex index 44669bb..35f7ea5 100644 --- a/shard/lib/net/tcpconn.ex +++ b/shard/lib/net/tcpconn.ex @@ -43,6 +43,10 @@ defmodule SNet.TCPConn do len = byte_size(expected_suffix) ^len = :binary.longest_common_suffix([cli_pkey, expected_suffix]) + if srv_pkey == cli_pkey do + exit :normal + end + # Connected :inet.setopts(socket, [active: true]) -- cgit v1.2.3