From 8c49dd71d29359447c24b1cd4f48a8faf0c4fdca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Oct 2018 12:18:05 +0200 Subject: Refactor shard starting/stopping --- shard/lib/app/chat.ex | 87 +++++++++++---------------- shard/lib/app/identity.ex | 45 ++++++-------- shard/lib/app/pagestore.ex | 2 +- shard/lib/application.ex | 6 +- shard/lib/cli/cli.ex | 15 +++-- shard/lib/keys.ex | 4 +- shard/lib/manager.ex | 147 +++++++++++++++++++++++---------------------- 7 files changed, 146 insertions(+), 160 deletions(-) (limited to 'shard') diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 2e5e076..d253030 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -36,15 +36,12 @@ defmodule SApp.Chat do """ defstruct [:channel] - end - defimpl Shard.Manifest, for: Manifest do - def start(m) do - DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m}) + defimpl Shard.Manifest do + def module(_m), do: SApp.Chat end end - defmodule PrivChat.Manifest do @moduledoc""" Manifest for a private chat room defined by the list of participants. @@ -60,11 +57,9 @@ defmodule SApp.Chat do def new(pk_list) do %__MODULE__{pk_list: pk_list |> Enum.sort |> Enum.uniq} end - end - defimpl Shard.Manifest, for: PrivChat.Manifest do - def start(m) do - DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m}) + defimpl Shard.Manifest do + def module(_m), do: SApp.Chat end end @@ -85,44 +80,39 @@ defmodule SApp.Chat do def init(manifest) do id = SData.term_hash manifest - case Shard.Manager.register(id, manifest, self()) do - :ok -> - netgroup = case manifest do - %Manifest{channel: _channel} -> - %SNet.PubShardGroup{id: id} - %PrivChat.Manifest{pk_list: pk_list} -> - %SNet.PrivGroup{pk_list: pk_list} - end - Shard.Manager.dispatch_to(id, nil, self()) - {:ok, page_store} = SApp.PageStore.start_link(id, :page_store, netgroup) - {root, read} = case Shard.Manager.load_state id do - %{root: root, read: read} -> {root, read} - _ -> {nil, nil} - end - root = cond do - root == nil -> nil - GenServer.call(page_store, {:have_rec, root}) -> root - true -> - Logger.warn "Not all pages for saved root were saved, restarting from an empty state!" - nil - end - mst = %MST{store: %SApp.PageStore{pid: page_store}, - cmp: &msg_cmp/2, - root: root} - SNet.Group.init_lookup(netgroup, self()) - {:ok, - %{id: id, - netgroup: netgroup, - manifest: manifest, - page_store: page_store, - mst: mst, - subs: MapSet.new, - read: read, - } - } - :redundant -> - exit(:redundant) + netgroup = case manifest do + %Manifest{channel: _channel} -> + %SNet.PubShardGroup{id: id} + %PrivChat.Manifest{pk_list: pk_list} -> + %SNet.PrivGroup{pk_list: pk_list} + end + Shard.Manager.dispatch_to(id, nil, self()) + {:ok, page_store} = SApp.PageStore.start_link(id, :page_store, netgroup) + {root, read} = case Shard.Manager.load_state id do + %{root: root, read: read} -> {root, read} + _ -> {nil, nil} end + root = cond do + root == nil -> nil + GenServer.call(page_store, {:have_rec, root}) -> root + true -> + Logger.warn "Not all pages for saved root were saved, restarting from an empty state!" + nil + end + mst = %MST{store: %SApp.PageStore{pid: page_store}, + cmp: &msg_cmp/2, + root: root} + SNet.Group.init_lookup(netgroup, self()) + {:ok, + %{id: id, + netgroup: netgroup, + manifest: manifest, + page_store: page_store, + mst: mst, + subs: MapSet.new, + read: read, + } + } end @doc """ @@ -215,11 +205,6 @@ defmodule SApp.Chat do @doc """ Implementation of the :msg handler, which is the main handler for messages comming from other peers concerning this chat room. - - Messages are: - - `{:get, start}`: get some messages starting at a given Merkle hash - - `{:info, start, list, rest}`: put some messages and informs of the - Merkle hash of the store of older messages. """ def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex index 42d1bf8..bacfdab 100644 --- a/shard/lib/app/identity.ex +++ b/shard/lib/app/identity.ex @@ -24,11 +24,9 @@ defmodule SApp.Identity do """ defstruct [:pk] - end - defimpl Shard.Manifest, for: Manifest do - def start(m) do - DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Identity, m.pk}) + defimpl Shard.Manifest do + def module(_m), do: SApp.Identity end end @@ -36,33 +34,28 @@ defmodule SApp.Identity do defstruct [:info, :rev, :signed] end - def start_link(pk) do - GenServer.start_link(__MODULE__, pk) + def start_link(manifest) do + GenServer.start_link(__MODULE__, manifest) end - def init(pk) do - manifest = %Manifest{pk: pk} + def init(manifest) do + %Manifest{pk: pk} = manifest id = SData.term_hash manifest - case Shard.Manager.register(id, manifest, self()) do - :ok -> - Shard.Manager.dispatch_to(id, nil, self()) - state = case Shard.Manager.load_state(id) do - nil -> - info = %{nick: default_nick(pk)} - SData.SignRev.new info - st -> - st - end - netgroup = %SNet.PubShardGroup{id: id} - SNet.Group.init_lookup(netgroup, self()) - if Shard.Keys.have_sk? pk do - GenServer.cast(self(), :update_peer_info) - end - {:ok, %{pk: pk, id: id, state: state, netgroup: netgroup}} - :redundant -> - exit(:redundant) + Shard.Manager.dispatch_to(id, nil, self()) + state = case Shard.Manager.load_state(id) do + nil -> + info = %{nick: default_nick(pk)} + SData.SignRev.new info + st -> + st + end + netgroup = %SNet.PubShardGroup{id: id} + SNet.Group.init_lookup(netgroup, self()) + if Shard.Keys.have_sk? pk do + GenServer.cast(self(), :update_peer_info) end + {:ok, %{pk: pk, id: id, state: state, netgroup: netgroup}} end def handle_call(:manifest, _from, state) do diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex index 8f6be59..5165b84 100644 --- a/shard/lib/app/pagestore.ex +++ b/shard/lib/app/pagestore.ex @@ -125,7 +125,7 @@ defmodule SApp.PageStore do def handle_cast({:msg, conn_pid, auth, _shard_id, _path, msg}, state) do if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do - state + {:noreply, state} else state = case msg do {:get, key} -> diff --git a/shard/lib/application.ex b/shard/lib/application.ex index 2017041..de9c998 100644 --- a/shard/lib/application.ex +++ b/shard/lib/application.ex @@ -13,8 +13,6 @@ defmodule Shard.Application do # Define workers and child supervisors to be supervised children = [ - { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor }, - # Networking SNet.Addr, SNet.Manager, @@ -25,8 +23,10 @@ defmodule Shard.Application do # Keys & identities Shard.Keys, + + # Initialize user data { Task, fn -> - Shard.Manifest.start %SApp.Chat.Manifest{channel: "lobby"} + Shard.Manager.find_or_start %SApp.Chat.Manifest{channel: "lobby"} Shard.Keys.get_any_identity end }, ] diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex index 4f17aeb..d454a0b 100644 --- a/shard/lib/cli/cli.ex +++ b/shard/lib/cli/cli.ex @@ -8,8 +8,12 @@ defmodule SCLI do end def run() do - for {_chid, %SApp.Chat.Manifest{}, chpid} <- Shard.Manager.list_shards do - SApp.Chat.subscribe(chpid) + for {_chid, manifest, _} <- Shard.Manager.list_shards do + case manifest do + %SApp.Chat.Manifest{} -> + SApp.Chat.subscribe(Shard.Manager.find_or_start manifest) + _ -> nil + end end pk = Shard.Keys.get_any_identity @@ -92,7 +96,7 @@ defmodule SCLI do defp handle_command(state, ["list"]) do IO.puts "List of known channels:" - for {_chid, %SApp.Chat.Manifest{channel: chan}, _chpid} <- Shard.Manager.list_shards do + for {_chid, %SApp.Chat.Manifest{channel: chan}, _} <- Shard.Manager.list_shards do IO.puts "##{chan}" end state @@ -120,9 +124,8 @@ defmodule SCLI do end defp handle_command(state, ["pm" | people_list]) do - known_people = for {_, %SApp.Identity.Manifest{pk: pk}, pid} <- Shard.Manager.list_shards() do - info = SApp.Identity.get_info(pid) - {pk, info.nick} + known_people = for {_, %SApp.Identity.Manifest{pk: pk}, _} <- Shard.Manager.list_shards() do + {pk, SApp.Identity.get_nick(pk)} end pk_list = for qname <- people_list do candidates = for {pk, nick} <- known_people, diff --git a/shard/lib/keys.ex b/shard/lib/keys.ex index f98deba..e0180b7 100644 --- a/shard/lib/keys.ex +++ b/shard/lib/keys.ex @@ -16,7 +16,7 @@ defmodule Shard.Keys do :dets.start {:ok, @key_db} = :dets.open_file(@key_db, [type: :set]) for [pk, _] <- :dets.match(@key_db, {:'$1', :'$2'}) do - Shard.Manifest.start %SApp.Identity.Manifest{pk: pk} + Shard.Manager.find_or_start %SApp.Identity.Manifest{pk: pk} end nil end @@ -53,7 +53,7 @@ defmodule Shard.Keys do {pk, sk} = gen_keypair(Application.get_env(:shard, :identity_suffix)) Logger.info "New identity: #{pk|>Base.encode16}" :dets.insert @key_db, {pk, sk} - Shard.Manifest.start %SApp.Identity.Manifest{pk: pk} + Shard.Manager.find_or_start %SApp.Identity.Manifest{pk: pk} pk end diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index d6b493b..3a0e21c 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -5,14 +5,18 @@ defprotocol Shard.Manifest do The hash of the manifest is the unique identifier of that shard on the network. The Manifest protocol is a protocol implemented by the manifest structs for the - different shard types. It contains an operation start() that is able to launch the - correct process for this shard and connect to other peers that use it. + different shard types. It contains an operation module() that returns the main module + for the shard processes. The module must contain a function with the signature: + + {:ok, pid} = .start_link(manifest) + + that will be called when the shard must be started. """ @doc""" - Start the corresponding Shard process + Get the module in question. """ - def start(manifest) + def module(manifest) end defmodule Shard.Manager do @@ -22,19 +26,15 @@ defmodule Shard.Manager do - :shard_db (persistent with DETS) List of - { id, manifest, pid | nil } - - - :shard_state (persistent with DETS) - - List of - { id, state } + { id, manifest, state } - :peer_db (persistent with DETS) Mult-list of { shard_id, peer_info } # TODO: add health info (last seen, ping, etc) - peer_info := {:inet, ip, port} | {:inet6, ip, port} | {:onion, name} + peer_info := {:inet, ip, port} + TODO peer_info |= {:inet6, ip, port} | {:onion, name} - :shard_procs (not persistent) @@ -47,7 +47,6 @@ defmodule Shard.Manager do require Logger @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom - @shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom def start_link(_) do @@ -55,66 +54,64 @@ defmodule Shard.Manager do end def init(_) do - :dets.open_file(@shard_db, [type: :set]) - for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do - :dets.insert @shard_db, {id, manifest, nil} - spawn fn -> Shard.Manifest.start manifest end - end + Process.flag(:trap_exit, true) - :dets.open_file(@shard_state, [type: :set]) + :dets.open_file(@shard_db, [type: :set]) :dets.open_file(@peer_db, [type: :bag]) :ets.new(:shard_procs, [:set, :protected, :named_table]) - {:ok, nil} + {:ok, %{}} end - def handle_call({:register, shard_id, manifest, pid}, _from, state) do - will_live = case :dets.lookup(@shard_db, shard_id) do - [{ ^shard_id, _, pid }] when pid != nil -> not Process.alive?(pid) - _ -> true + def handle_call({:find_or_start, shard_id, manifest}, _from, state) do + case :dets.lookup(@shard_db, shard_id) do + [] -> :dets.insert(@shard_db, {shard_id, manifest, nil}) + _ -> nil end - reply = if will_live do - Process.monitor(pid) - :dets.insert(@shard_db, {shard_id, manifest, pid}) - :ok - else - :redundant + + case :ets.lookup(:shard_procs, {shard_id, nil}) do + [] -> + {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest]) + :ets.insert(:shard_procs, {{shard_id, nil}, pid}) + state = Map.put(state, pid, {shard_id, nil}) + {:reply, pid, state} + pid -> + {:reply, pid, state} end - {:reply, reply, state} end def handle_cast({:dispatch_to, shard_id, path, pid}, state) do :ets.insert(:shard_procs, { {shard_id, path}, pid }) - Process.monitor(pid) + state = Map.put(state, pid, {shard_id, path}) + if path != nil do + Process.monitor(pid) + end {:noreply, state} end - def handle_cast({:interested, conn_pid, peer_info, auth, shards}, state) do - for shard_id <- shards do - case :dets.lookup(@shard_db, shard_id) do - [{ ^shard_id, _, pid }] -> - :dets.insert(@peer_db, {shard_id, peer_info}) - GenServer.cast(pid, {:interested, conn_pid, auth}) - [] -> nil - end - end + def handle_cast({:peer_db_insert, shard_id, peer_info}, state) do + :dets.insert(@peer_db, {shard_id, peer_info}) {:noreply, state} end - def handle_cast({:not_interested, peer_info, shard_id}, state) do + def handle_cast({:peer_db_delete, shard_id, peer_info}, state) do :dets.match_delete(@peer_db, {shard_id, peer_info}) {:noreply, state} end - def handle_cast({:shard_peer_db_insert, shard_id, peer_info}, state) do - :dets.insert(@peer_db, {shard_id, peer_info}) - {:noreply, state} + def handle_info({:DOWN, _, :process, pid, reason}, state) do + handle_info({:EXIT, pid, reason}, state) end - def handle_info({:DOWN, _, :process, pid, _}, state) do - :ets.match_delete(:shard_procs, {:_, pid}) - {:noreply, state} + def handle_info({:EXIT, pid, _reason}, state) do + case state[pid] do + nil -> {:noreply, state} + info -> + :ets.delete(:shard_procs, info) + state = Map.delete(state, pid) + {:noreply, state} + end end @@ -126,29 +123,37 @@ defmodule Shard.Manager do Dispatch incoming message to correct shard process """ def incoming(conn_pid, peer_info, auth, {:interested, shards}) do - GenServer.cast(__MODULE__, {:interested, conn_pid, peer_info, auth, shards}) + for shard_id <- shards do + case :dets.lookup(@shard_db, shard_id) do + [{ ^shard_id, manifest, _ }] -> + GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info}) + pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do + [] -> + GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest}) + [{{^shard_id, nil}, pid}] -> pid + end + GenServer.cast(pid, {:interested, conn_pid, auth}) + [] -> nil + end + end end def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do - GenServer.cast(__MODULE__, {:not_interested, peer_info, shard}) + GenServer.cast(__MODULE__, {:peer_db_delete, shard, peer_info}) end def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do case :dets.lookup(@shard_db, shard_id) do [] -> GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}}) - [_] -> - case :dets.match(@peer_db, {shard_id, peer_info}) do + [{ ^shard_id, manifest, _}] -> + GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info}) + pid = case :ets.lookup(:shard_procs, {shard_id, path}) do [] -> - GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_info}) - _ -> nil - end - case :ets.lookup(:shard_procs, {shard_id, path}) do - [{ {^shard_id, ^path}, pid }] -> - GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg}) - [] -> - Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}") + GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest}) + [{ {^shard_id, ^path}, pid }] -> pid end + GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg}) end end @@ -184,8 +189,8 @@ defmodule Shard.Manager do Return the saved state value for a shard """ def load_state(shard_id) do - case :dets.lookup(@shard_state, shard_id) do - [{^shard_id, state}] -> state + case :dets.lookup(@shard_db, shard_id) do + [{^shard_id, _, state}] -> state _ -> nil end end @@ -194,7 +199,10 @@ defmodule Shard.Manager do Save a state value for a shard """ def save_state(shard_id, state) do - :dets.insert(@shard_state, {shard_id, state}) + case :dets.lookup(@shard_db, shard_id) do + [{^shard_id, manifest, _old_state}] -> + :dets.insert(@shard_db, {shard_id, manifest, state}) + end end @@ -206,8 +214,8 @@ defmodule Shard.Manager do Returns the pid for a shard if it exists """ def find_proc(shard_id) do - case :dets.lookup(@shard_db, shard_id) do - [{^shard_id, _, pid}] -> pid + case :ets.lookup(:shard_procs, {shard_id, nil}) do + [{{^shard_id, _}, pid}] -> pid _ -> nil end end @@ -218,13 +226,10 @@ defmodule Shard.Manager do """ def find_or_start(manifest) do id = SData.term_hash manifest - case find_proc id do - nil -> - case Shard.Manifest.start manifest do - {:ok, pid} -> pid - {:error, :redundant} -> find_proc id - end - pid -> pid + case :ets.lookup(:shard_procs, {id, nil}) do + [{{^id, nil}, pid}] -> pid + [] -> + GenServer.call(__MODULE__, {:find_or_start, id, manifest}) end end -- cgit v1.2.3