aboutsummaryrefslogtreecommitdiff
path: root/shard
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-10-15 12:18:05 +0200
committerAlex Auvolat <alex@adnab.me>2018-10-15 12:18:05 +0200
commit8c49dd71d29359447c24b1cd4f48a8faf0c4fdca (patch)
treee2d8b61139d6b72e6abfda6277918f54fdae36ef /shard
parent7a9678843647de930885792149b279ef105f67b6 (diff)
downloadshard-8c49dd71d29359447c24b1cd4f48a8faf0c4fdca.tar.gz
shard-8c49dd71d29359447c24b1cd4f48a8faf0c4fdca.zip
Refactor shard starting/stopping
Diffstat (limited to 'shard')
-rw-r--r--shard/lib/app/chat.ex87
-rw-r--r--shard/lib/app/identity.ex45
-rw-r--r--shard/lib/app/pagestore.ex2
-rw-r--r--shard/lib/application.ex6
-rw-r--r--shard/lib/cli/cli.ex15
-rw-r--r--shard/lib/keys.ex4
-rw-r--r--shard/lib/manager.ex147
7 files changed, 146 insertions, 160 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index 2e5e076..d253030 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -36,15 +36,12 @@ defmodule SApp.Chat do
"""
defstruct [:channel]
- end
- defimpl Shard.Manifest, for: Manifest do
- def start(m) do
- DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m})
+ defimpl Shard.Manifest do
+ def module(_m), do: SApp.Chat
end
end
-
defmodule PrivChat.Manifest do
@moduledoc"""
Manifest for a private chat room defined by the list of participants.
@@ -60,11 +57,9 @@ defmodule SApp.Chat do
def new(pk_list) do
%__MODULE__{pk_list: pk_list |> Enum.sort |> Enum.uniq}
end
- end
- defimpl Shard.Manifest, for: PrivChat.Manifest do
- def start(m) do
- DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m})
+ defimpl Shard.Manifest do
+ def module(_m), do: SApp.Chat
end
end
@@ -85,44 +80,39 @@ defmodule SApp.Chat do
def init(manifest) do
id = SData.term_hash manifest
- case Shard.Manager.register(id, manifest, self()) do
- :ok ->
- netgroup = case manifest do
- %Manifest{channel: _channel} ->
- %SNet.PubShardGroup{id: id}
- %PrivChat.Manifest{pk_list: pk_list} ->
- %SNet.PrivGroup{pk_list: pk_list}
- end
- Shard.Manager.dispatch_to(id, nil, self())
- {:ok, page_store} = SApp.PageStore.start_link(id, :page_store, netgroup)
- {root, read} = case Shard.Manager.load_state id do
- %{root: root, read: read} -> {root, read}
- _ -> {nil, nil}
- end
- root = cond do
- root == nil -> nil
- GenServer.call(page_store, {:have_rec, root}) -> root
- true ->
- Logger.warn "Not all pages for saved root were saved, restarting from an empty state!"
- nil
- end
- mst = %MST{store: %SApp.PageStore{pid: page_store},
- cmp: &msg_cmp/2,
- root: root}
- SNet.Group.init_lookup(netgroup, self())
- {:ok,
- %{id: id,
- netgroup: netgroup,
- manifest: manifest,
- page_store: page_store,
- mst: mst,
- subs: MapSet.new,
- read: read,
- }
- }
- :redundant ->
- exit(:redundant)
+ netgroup = case manifest do
+ %Manifest{channel: _channel} ->
+ %SNet.PubShardGroup{id: id}
+ %PrivChat.Manifest{pk_list: pk_list} ->
+ %SNet.PrivGroup{pk_list: pk_list}
+ end
+ Shard.Manager.dispatch_to(id, nil, self())
+ {:ok, page_store} = SApp.PageStore.start_link(id, :page_store, netgroup)
+ {root, read} = case Shard.Manager.load_state id do
+ %{root: root, read: read} -> {root, read}
+ _ -> {nil, nil}
end
+ root = cond do
+ root == nil -> nil
+ GenServer.call(page_store, {:have_rec, root}) -> root
+ true ->
+ Logger.warn "Not all pages for saved root were saved, restarting from an empty state!"
+ nil
+ end
+ mst = %MST{store: %SApp.PageStore{pid: page_store},
+ cmp: &msg_cmp/2,
+ root: root}
+ SNet.Group.init_lookup(netgroup, self())
+ {:ok,
+ %{id: id,
+ netgroup: netgroup,
+ manifest: manifest,
+ page_store: page_store,
+ mst: mst,
+ subs: MapSet.new,
+ read: read,
+ }
+ }
end
@doc """
@@ -215,11 +205,6 @@ defmodule SApp.Chat do
@doc """
Implementation of the :msg handler, which is the main handler for messages
comming from other peers concerning this chat room.
-
- Messages are:
- - `{:get, start}`: get some messages starting at a given Merkle hash
- - `{:info, start, list, rest}`: put some messages and informs of the
- Merkle hash of the store of older messages.
"""
def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex
index 42d1bf8..bacfdab 100644
--- a/shard/lib/app/identity.ex
+++ b/shard/lib/app/identity.ex
@@ -24,11 +24,9 @@ defmodule SApp.Identity do
"""
defstruct [:pk]
- end
- defimpl Shard.Manifest, for: Manifest do
- def start(m) do
- DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Identity, m.pk})
+ defimpl Shard.Manifest do
+ def module(_m), do: SApp.Identity
end
end
@@ -36,33 +34,28 @@ defmodule SApp.Identity do
defstruct [:info, :rev, :signed]
end
- def start_link(pk) do
- GenServer.start_link(__MODULE__, pk)
+ def start_link(manifest) do
+ GenServer.start_link(__MODULE__, manifest)
end
- def init(pk) do
- manifest = %Manifest{pk: pk}
+ def init(manifest) do
+ %Manifest{pk: pk} = manifest
id = SData.term_hash manifest
- case Shard.Manager.register(id, manifest, self()) do
- :ok ->
- Shard.Manager.dispatch_to(id, nil, self())
- state = case Shard.Manager.load_state(id) do
- nil ->
- info = %{nick: default_nick(pk)}
- SData.SignRev.new info
- st ->
- st
- end
- netgroup = %SNet.PubShardGroup{id: id}
- SNet.Group.init_lookup(netgroup, self())
- if Shard.Keys.have_sk? pk do
- GenServer.cast(self(), :update_peer_info)
- end
- {:ok, %{pk: pk, id: id, state: state, netgroup: netgroup}}
- :redundant ->
- exit(:redundant)
+ Shard.Manager.dispatch_to(id, nil, self())
+ state = case Shard.Manager.load_state(id) do
+ nil ->
+ info = %{nick: default_nick(pk)}
+ SData.SignRev.new info
+ st ->
+ st
+ end
+ netgroup = %SNet.PubShardGroup{id: id}
+ SNet.Group.init_lookup(netgroup, self())
+ if Shard.Keys.have_sk? pk do
+ GenServer.cast(self(), :update_peer_info)
end
+ {:ok, %{pk: pk, id: id, state: state, netgroup: netgroup}}
end
def handle_call(:manifest, _from, state) do
diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex
index 8f6be59..5165b84 100644
--- a/shard/lib/app/pagestore.ex
+++ b/shard/lib/app/pagestore.ex
@@ -125,7 +125,7 @@ defmodule SApp.PageStore do
def handle_cast({:msg, conn_pid, auth, _shard_id, _path, msg}, state) do
if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
- state
+ {:noreply, state}
else
state = case msg do
{:get, key} ->
diff --git a/shard/lib/application.ex b/shard/lib/application.ex
index 2017041..de9c998 100644
--- a/shard/lib/application.ex
+++ b/shard/lib/application.ex
@@ -13,8 +13,6 @@ defmodule Shard.Application do
# Define workers and child supervisors to be supervised
children = [
- { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor },
-
# Networking
SNet.Addr,
SNet.Manager,
@@ -25,8 +23,10 @@ defmodule Shard.Application do
# Keys & identities
Shard.Keys,
+
+ # Initialize user data
{ Task, fn ->
- Shard.Manifest.start %SApp.Chat.Manifest{channel: "lobby"}
+ Shard.Manager.find_or_start %SApp.Chat.Manifest{channel: "lobby"}
Shard.Keys.get_any_identity
end },
]
diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex
index 4f17aeb..d454a0b 100644
--- a/shard/lib/cli/cli.ex
+++ b/shard/lib/cli/cli.ex
@@ -8,8 +8,12 @@ defmodule SCLI do
end
def run() do
- for {_chid, %SApp.Chat.Manifest{}, chpid} <- Shard.Manager.list_shards do
- SApp.Chat.subscribe(chpid)
+ for {_chid, manifest, _} <- Shard.Manager.list_shards do
+ case manifest do
+ %SApp.Chat.Manifest{} ->
+ SApp.Chat.subscribe(Shard.Manager.find_or_start manifest)
+ _ -> nil
+ end
end
pk = Shard.Keys.get_any_identity
@@ -92,7 +96,7 @@ defmodule SCLI do
defp handle_command(state, ["list"]) do
IO.puts "List of known channels:"
- for {_chid, %SApp.Chat.Manifest{channel: chan}, _chpid} <- Shard.Manager.list_shards do
+ for {_chid, %SApp.Chat.Manifest{channel: chan}, _} <- Shard.Manager.list_shards do
IO.puts "##{chan}"
end
state
@@ -120,9 +124,8 @@ defmodule SCLI do
end
defp handle_command(state, ["pm" | people_list]) do
- known_people = for {_, %SApp.Identity.Manifest{pk: pk}, pid} <- Shard.Manager.list_shards() do
- info = SApp.Identity.get_info(pid)
- {pk, info.nick}
+ known_people = for {_, %SApp.Identity.Manifest{pk: pk}, _} <- Shard.Manager.list_shards() do
+ {pk, SApp.Identity.get_nick(pk)}
end
pk_list = for qname <- people_list do
candidates = for {pk, nick} <- known_people,
diff --git a/shard/lib/keys.ex b/shard/lib/keys.ex
index f98deba..e0180b7 100644
--- a/shard/lib/keys.ex
+++ b/shard/lib/keys.ex
@@ -16,7 +16,7 @@ defmodule Shard.Keys do
:dets.start
{:ok, @key_db} = :dets.open_file(@key_db, [type: :set])
for [pk, _] <- :dets.match(@key_db, {:'$1', :'$2'}) do
- Shard.Manifest.start %SApp.Identity.Manifest{pk: pk}
+ Shard.Manager.find_or_start %SApp.Identity.Manifest{pk: pk}
end
nil
end
@@ -53,7 +53,7 @@ defmodule Shard.Keys do
{pk, sk} = gen_keypair(Application.get_env(:shard, :identity_suffix))
Logger.info "New identity: #{pk|>Base.encode16}"
:dets.insert @key_db, {pk, sk}
- Shard.Manifest.start %SApp.Identity.Manifest{pk: pk}
+ Shard.Manager.find_or_start %SApp.Identity.Manifest{pk: pk}
pk
end
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index d6b493b..3a0e21c 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -5,14 +5,18 @@ defprotocol Shard.Manifest do
The hash of the manifest is the unique identifier of that shard on the network.
The Manifest protocol is a protocol implemented by the manifest structs for the
- different shard types. It contains an operation start() that is able to launch the
- correct process for this shard and connect to other peers that use it.
+ different shard types. It contains an operation module() that returns the main module
+ for the shard processes. The module must contain a function with the signature:
+
+ {:ok, pid} = <module>.start_link(manifest)
+
+ that will be called when the shard must be started.
"""
@doc"""
- Start the corresponding Shard process
+ Get the module in question.
"""
- def start(manifest)
+ def module(manifest)
end
defmodule Shard.Manager do
@@ -22,19 +26,15 @@ defmodule Shard.Manager do
- :shard_db (persistent with DETS)
List of
- { id, manifest, pid | nil }
-
- - :shard_state (persistent with DETS)
-
- List of
- { id, state }
+ { id, manifest, state }
- :peer_db (persistent with DETS)
Mult-list of
{ shard_id, peer_info } # TODO: add health info (last seen, ping, etc)
- peer_info := {:inet, ip, port} | {:inet6, ip, port} | {:onion, name}
+ peer_info := {:inet, ip, port}
+ TODO peer_info |= {:inet6, ip, port} | {:onion, name}
- :shard_procs (not persistent)
@@ -47,7 +47,6 @@ defmodule Shard.Manager do
require Logger
@shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
- @shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom
@peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
def start_link(_) do
@@ -55,66 +54,64 @@ defmodule Shard.Manager do
end
def init(_) do
- :dets.open_file(@shard_db, [type: :set])
- for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do
- :dets.insert @shard_db, {id, manifest, nil}
- spawn fn -> Shard.Manifest.start manifest end
- end
+ Process.flag(:trap_exit, true)
- :dets.open_file(@shard_state, [type: :set])
+ :dets.open_file(@shard_db, [type: :set])
:dets.open_file(@peer_db, [type: :bag])
:ets.new(:shard_procs, [:set, :protected, :named_table])
- {:ok, nil}
+ {:ok, %{}}
end
- def handle_call({:register, shard_id, manifest, pid}, _from, state) do
- will_live = case :dets.lookup(@shard_db, shard_id) do
- [{ ^shard_id, _, pid }] when pid != nil -> not Process.alive?(pid)
- _ -> true
+ def handle_call({:find_or_start, shard_id, manifest}, _from, state) do
+ case :dets.lookup(@shard_db, shard_id) do
+ [] -> :dets.insert(@shard_db, {shard_id, manifest, nil})
+ _ -> nil
end
- reply = if will_live do
- Process.monitor(pid)
- :dets.insert(@shard_db, {shard_id, manifest, pid})
- :ok
- else
- :redundant
+
+ case :ets.lookup(:shard_procs, {shard_id, nil}) do
+ [] ->
+ {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest])
+ :ets.insert(:shard_procs, {{shard_id, nil}, pid})
+ state = Map.put(state, pid, {shard_id, nil})
+ {:reply, pid, state}
+ pid ->
+ {:reply, pid, state}
end
- {:reply, reply, state}
end
def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
:ets.insert(:shard_procs, { {shard_id, path}, pid })
- Process.monitor(pid)
+ state = Map.put(state, pid, {shard_id, path})
+ if path != nil do
+ Process.monitor(pid)
+ end
{:noreply, state}
end
- def handle_cast({:interested, conn_pid, peer_info, auth, shards}, state) do
- for shard_id <- shards do
- case :dets.lookup(@shard_db, shard_id) do
- [{ ^shard_id, _, pid }] ->
- :dets.insert(@peer_db, {shard_id, peer_info})
- GenServer.cast(pid, {:interested, conn_pid, auth})
- [] -> nil
- end
- end
+ def handle_cast({:peer_db_insert, shard_id, peer_info}, state) do
+ :dets.insert(@peer_db, {shard_id, peer_info})
{:noreply, state}
end
- def handle_cast({:not_interested, peer_info, shard_id}, state) do
+ def handle_cast({:peer_db_delete, shard_id, peer_info}, state) do
:dets.match_delete(@peer_db, {shard_id, peer_info})
{:noreply, state}
end
- def handle_cast({:shard_peer_db_insert, shard_id, peer_info}, state) do
- :dets.insert(@peer_db, {shard_id, peer_info})
- {:noreply, state}
+ def handle_info({:DOWN, _, :process, pid, reason}, state) do
+ handle_info({:EXIT, pid, reason}, state)
end
- def handle_info({:DOWN, _, :process, pid, _}, state) do
- :ets.match_delete(:shard_procs, {:_, pid})
- {:noreply, state}
+ def handle_info({:EXIT, pid, _reason}, state) do
+ case state[pid] do
+ nil -> {:noreply, state}
+ info ->
+ :ets.delete(:shard_procs, info)
+ state = Map.delete(state, pid)
+ {:noreply, state}
+ end
end
@@ -126,29 +123,37 @@ defmodule Shard.Manager do
Dispatch incoming message to correct shard process
"""
def incoming(conn_pid, peer_info, auth, {:interested, shards}) do
- GenServer.cast(__MODULE__, {:interested, conn_pid, peer_info, auth, shards})
+ for shard_id <- shards do
+ case :dets.lookup(@shard_db, shard_id) do
+ [{ ^shard_id, manifest, _ }] ->
+ GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
+ pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do
+ [] ->
+ GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
+ [{{^shard_id, nil}, pid}] -> pid
+ end
+ GenServer.cast(pid, {:interested, conn_pid, auth})
+ [] -> nil
+ end
+ end
end
def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do
- GenServer.cast(__MODULE__, {:not_interested, peer_info, shard})
+ GenServer.cast(__MODULE__, {:peer_db_delete, shard, peer_info})
end
def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do
case :dets.lookup(@shard_db, shard_id) do
[] ->
GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}})
- [_] ->
- case :dets.match(@peer_db, {shard_id, peer_info}) do
+ [{ ^shard_id, manifest, _}] ->
+ GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
+ pid = case :ets.lookup(:shard_procs, {shard_id, path}) do
[] ->
- GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_info})
- _ -> nil
- end
- case :ets.lookup(:shard_procs, {shard_id, path}) do
- [{ {^shard_id, ^path}, pid }] ->
- GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg})
- [] ->
- Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}")
+ GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
+ [{ {^shard_id, ^path}, pid }] -> pid
end
+ GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg})
end
end
@@ -184,8 +189,8 @@ defmodule Shard.Manager do
Return the saved state value for a shard
"""
def load_state(shard_id) do
- case :dets.lookup(@shard_state, shard_id) do
- [{^shard_id, state}] -> state
+ case :dets.lookup(@shard_db, shard_id) do
+ [{^shard_id, _, state}] -> state
_ -> nil
end
end
@@ -194,7 +199,10 @@ defmodule Shard.Manager do
Save a state value for a shard
"""
def save_state(shard_id, state) do
- :dets.insert(@shard_state, {shard_id, state})
+ case :dets.lookup(@shard_db, shard_id) do
+ [{^shard_id, manifest, _old_state}] ->
+ :dets.insert(@shard_db, {shard_id, manifest, state})
+ end
end
@@ -206,8 +214,8 @@ defmodule Shard.Manager do
Returns the pid for a shard if it exists
"""
def find_proc(shard_id) do
- case :dets.lookup(@shard_db, shard_id) do
- [{^shard_id, _, pid}] -> pid
+ case :ets.lookup(:shard_procs, {shard_id, nil}) do
+ [{{^shard_id, _}, pid}] -> pid
_ -> nil
end
end
@@ -218,13 +226,10 @@ defmodule Shard.Manager do
"""
def find_or_start(manifest) do
id = SData.term_hash manifest
- case find_proc id do
- nil ->
- case Shard.Manifest.start manifest do
- {:ok, pid} -> pid
- {:error, :redundant} -> find_proc id
- end
- pid -> pid
+ case :ets.lookup(:shard_procs, {id, nil}) do
+ [{{^id, nil}, pid}] -> pid
+ [] ->
+ GenServer.call(__MODULE__, {:find_or_start, id, manifest})
end
end