aboutsummaryrefslogtreecommitdiff
path: root/shard
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-10-15 16:15:35 +0200
committerAlex Auvolat <alex@adnab.me>2018-10-15 16:15:35 +0200
commit181baf7e0c26c51d7c605bc9797f77ced9188455 (patch)
treeb8520b756c25df1648006f9390d51974b94ea9c1 /shard
parent8c49dd71d29359447c24b1cd4f48a8faf0c4fdca (diff)
downloadshard-181baf7e0c26c51d7c605bc9797f77ced9188455.tar.gz
shard-181baf7e0c26c51d7c605bc9797f77ced9188455.zip
Basic infrastructure for dependency between shards
Diffstat (limited to 'shard')
-rw-r--r--shard/lib/app/chat.ex5
-rw-r--r--shard/lib/app/directory.ex168
-rw-r--r--shard/lib/app/identity.ex8
-rw-r--r--shard/lib/keys.ex8
-rw-r--r--shard/lib/manager.ex174
5 files changed, 337 insertions, 26 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index d253030..4c7fc00 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -141,6 +141,11 @@ defmodule SApp.Chat do
end
end
+ def handle_cast(:send_deps, state) do
+ GenServer.cast(Shard.Manager, {:dep_list, state.id, []})
+ {:noreply, state}
+ end
+
def handle_cast(:mark_read, state) do
state = %{state | read: state.mst.root}
save_state(state)
diff --git a/shard/lib/app/directory.ex b/shard/lib/app/directory.ex
new file mode 100644
index 0000000..f7d871a
--- /dev/null
+++ b/shard/lib/app/directory.ex
@@ -0,0 +1,168 @@
+defmodule SApp.Directory do
+ @moduledoc"""
+ Shard application for a directory of other shards.
+
+ TODO: use MST for file list instead of plain list
+ """
+
+ use GenServer
+
+ require Logger
+
+ defmodule Manifest do
+ @moduledoc"""
+ Manifest for a directory. This directory is owned by a user,
+ has a name, and can be either public or private.
+ """
+
+ defstruct [:owner, :public, :name]
+
+ defimpl Shard.Manifest do
+ def module(_m), do: SApp.Directory
+ end
+ end
+
+ def start_link(manifest) do
+ GenServer.start_link(__MODULE__, manifest)
+ end
+
+ def init(manifest) do
+ %Manifest{owner: owner, public: public, name: name} = manifest
+ id = SData.term_hash manifest
+
+ Shard.Manager.dispatch_to(id, nil, self())
+ files = case Shard.Manager.load_state(id) do
+ nil ->
+ SData.SignRev.new %{}
+ st -> st
+ end
+ netgroup = case public do
+ true -> %SNet.PubShardGroup{id: id}
+ false -> %SNet.PrivGroup{pk_list: [owner]}
+ end
+ SNet.Group.init_lookup(netgroup, self())
+
+ {:ok, %{
+ owner: owner, public: public, name: name,
+ manifest: manifest, id: id, netgroup: netgroup,
+ files: files}}
+ end
+
+ def handle_call(:manifest, _from, state) do
+ {:reply, state.manifest, state}
+ end
+
+ def handle_call(:get_files, _from, state) do
+ {:reply, SData.SignRev.get(state.files), state}
+ end
+
+ def handle_call({:add_file, name, manifest}, _from, state) do
+ if Shard.Keys.have_sk?(state.owner) do
+ dict = SData.SignRev.get(state.files)
+ if dict[name] != nil and dict[name] != manifest do
+ {:reply, :exists_already, state}
+ else
+ dict = Map.put(dict, name, manifest)
+ GenServer.cast(Shard.Manager, {:dep_list, state.id, Map.values(dict)})
+ {:ok, st2} = SData.SignRev.set(state.files, dict, state.owner)
+ Shard.Manager.save_state(state.id, st2)
+ state = put_in(state.files, st2)
+ bcast_state(state)
+ {:reply, :ok, state}
+ end
+ else
+ {:reply, :impossible, state}
+ end
+ end
+
+ def handle_call({:rm_file, name}, _from, state) do
+ if Shard.Keys.have_sk?(state.owner) do
+ dict = SData.SignRev.get(state.files)
+ if dict[name] == nil do
+ {:reply, :not_found, state}
+ else
+ dict = Map.delete(dict, name)
+ GenServer.cast(Shard.Manager, {:dep_list, state.id, Map.values(dict)})
+ {:ok, st2} = SData.SignRev.set(state.files, dict, state.owner)
+ Shard.Manager.save_state(state.id, st2)
+ state = put_in(state.files, st2)
+ bcast_state(state)
+ {:reply, :ok, state}
+ end
+ else
+ {:reply, :impossible, state}
+ end
+ end
+
+ def handle_cast(:send_deps, state) do
+ dict = SData.SignRev.get(state.files)
+ GenServer.cast(Shard.Manager, {:dep_list, state.id, Map.values(dict)})
+ {:noreply, state}
+ end
+
+ def handle_cast({:interested, peer_pid, auth}, state) do
+ if SNet.Group.in_group?(state.netgroup, peer_pid, auth) do
+ SNet.Manager.send_pid(peer_pid, {state.id, nil, {:update, SData.SignRev.signed(state.files), true}})
+ end
+ {:noreply, state}
+ end
+
+ def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
+ if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
+ {:noreply, state}
+ else
+ state = case msg do
+ {:update, signed, ask_reply} when signed != nil ->
+ state = case SData.SignRev.merge(state.files, signed, state.pk) do
+ {true, newfiles} ->
+ Shard.Manager.save_state(state.id, newfiles)
+ state = put_in(state.files, newfiles)
+ bcast_state(state, [conn_pid])
+ state
+ {false, _} ->
+ state
+ end
+ if ask_reply do
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:update, SData.SignRev.signed(state.files), false}})
+ end
+ state
+ _ -> state
+ end
+ {:noreply, state}
+ end
+ end
+
+ defp bcast_state(state, exclude \\ []) do
+ msg = {state.id, nil, {:update, SData.SignRev.signed(state.files), false}}
+ SNet.Group.broadcast(state.netgroup, msg, exclude_pid: exclude)
+ end
+
+ # ================
+ # PUBLIC INTERFACE
+ # ================
+
+ @doc"""
+ Return list of files stored in this directory.
+
+ Returns a list of {name, manifests}.
+ """
+ def get_files(pid) do
+ GenServer.call(pid, :get_files)
+ end
+
+ @doc"""
+ Add a file to this directory. A file is a name for a shard manifest.
+ A file added to a directory becomes a dependency of the directory, i.e.
+ if the directory is pinned then all files inside are pinned as well.
+ """
+ def add_file(pid, name, manifest) do
+ GenServer.call(pid, {:add_file, name, manifest})
+ end
+
+ @doc"""
+ Remove a named file from this directory.
+ """
+ def rm_file(pid, name) do
+ GenServer.call(pid, {:rm_file, name})
+ end
+end
diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex
index bacfdab..b034107 100644
--- a/shard/lib/app/identity.ex
+++ b/shard/lib/app/identity.ex
@@ -78,6 +78,14 @@ defmodule SApp.Identity do
end
end
+ def handle_cast(:send_deps, state) do
+ default_deps = [
+ %SApp.Directory.Manifest{owner: state.pk, public: false, name: "friends"}
+ ]
+ GenServer.cast(Shard.Manager, {:dep_list, state.id, default_deps})
+ {:noreply, state}
+ end
+
def handle_cast({:peer_connected, peer_pid}, state) do
GenServer.cast(peer_pid, {:send_msg, {:interested, [state.id]}})
{:noreply, state}
diff --git a/shard/lib/keys.ex b/shard/lib/keys.ex
index e0180b7..412baa2 100644
--- a/shard/lib/keys.ex
+++ b/shard/lib/keys.ex
@@ -16,7 +16,9 @@ defmodule Shard.Keys do
:dets.start
{:ok, @key_db} = :dets.open_file(@key_db, [type: :set])
for [pk, _] <- :dets.match(@key_db, {:'$1', :'$2'}) do
- Shard.Manager.find_or_start %SApp.Identity.Manifest{pk: pk}
+ m = %SApp.Identity.Manifest{pk: pk}
+ Shard.Manager.find_or_start m
+ GenServer.cast(Shard.Manager, {:pin, SData.term_hash(m)})
end
nil
end
@@ -53,7 +55,9 @@ defmodule Shard.Keys do
{pk, sk} = gen_keypair(Application.get_env(:shard, :identity_suffix))
Logger.info "New identity: #{pk|>Base.encode16}"
:dets.insert @key_db, {pk, sk}
- Shard.Manager.find_or_start %SApp.Identity.Manifest{pk: pk}
+ m = %SApp.Identity.Manifest{pk: pk}
+ Shard.Manager.find_or_start m
+ GenServer.cast(Shard.Manager, {:pin, SData.term_hash(m)})
pk
end
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index 3a0e21c..990bcea 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -26,7 +26,11 @@ defmodule Shard.Manager do
- :shard_db (persistent with DETS)
List of
- { id, manifest, state }
+ { id, manifest, why_have_it, state }
+
+ why_have_it := {:pinned, %MapSet{who requires it...}, %MapSet{who it requires...}}
+ | {:req, %MapSet{who requires it...}, %MapSet{who it requires...}}
+ | {:cached, expiry_date}
- :peer_db (persistent with DETS)
@@ -46,6 +50,9 @@ defmodule Shard.Manager do
require Logger
+ @cache_ttl 3600*24 # 24 hours
+ @clean_cache_every 60 # one minute
+
@shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
@peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
@@ -61,24 +68,13 @@ defmodule Shard.Manager do
:ets.new(:shard_procs, [:set, :protected, :named_table])
+ Process.send_after(self(), :clean_cache, 1000)
{:ok, %{}}
end
def handle_call({:find_or_start, shard_id, manifest}, _from, state) do
- case :dets.lookup(@shard_db, shard_id) do
- [] -> :dets.insert(@shard_db, {shard_id, manifest, nil})
- _ -> nil
- end
-
- case :ets.lookup(:shard_procs, {shard_id, nil}) do
- [] ->
- {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest])
- :ets.insert(:shard_procs, {{shard_id, nil}, pid})
- state = Map.put(state, pid, {shard_id, nil})
- {:reply, pid, state}
- pid ->
- {:reply, pid, state}
- end
+ {pid, state} = find_or_start(state, shard_id, manifest)
+ {:reply, pid, state}
end
def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
@@ -100,6 +96,84 @@ defmodule Shard.Manager do
{:noreply, state}
end
+ def handle_cast({:save_state, shard_id, shst}, state) do
+ case :dets.lookup(@shard_db, shard_id) do
+ [{^shard_id, manifest, why_have_it, _old_state}] ->
+ :dets.insert(@shard_db, {shard_id, manifest, why_have_it, shst})
+ end
+ {:noreply, state}
+ end
+
+ def handle_cast({:pin, shard_id}, state) do
+ case :dets.lookup(@shard_db, shard_id) do
+ [{^shard_id, manifest, {:cached, _}, shst}] ->
+ :dets.insert(@shard_db, {shard_id, manifest, {:pinned, %MapSet{}, %MapSet{}}, shst})
+ {pid, state} = find_or_start(state, shard_id, manifest)
+ GenServer.cast(pid, :send_deps)
+ {:noreply, state}
+ [{^shard_id, manifest, {:req, a, b}, shst}] ->
+ :dets.insert(@shard_db, {shard_id, manifest, {:pinned, a, b}, shst})
+ {:noreply, state}
+ _ ->
+ {:noreply, state}
+ end
+ end
+
+ def handle_cast({:unpin, shard_id}, state) do
+ case :dets.lookup(@shard_db, shard_id) do
+ [{^shard_id, manifest, {:pinned, a, b}, shst}] ->
+ if MapSet.size(a) > 0 do
+ :dets.insert(@shard_db, {shard_id, manifest, {:req, a, b}, shst})
+ else
+ for dep <- b do
+ rm_dep_link(shard_id, dep)
+ end
+ :dets.insert(@shard_db, {shard_id, manifest, cached(), shst})
+ end
+ _ -> nil
+ end
+ {:noreply, state}
+ end
+
+ def handle_cast({:dep_list, shard_id, manifests}, state) do
+ case :dets.lookup(@shard_db, shard_id) do
+ [{^shard_id, manifest, {reason, a, b}, shst}] when reason == :pinned or reason == :req ->
+ bnew_pairs = Enum.map(manifests, fn m -> {SData.term_hash(m), m} end)
+ bnew_map = Enum.reduce(bnew_pairs, %{}, fn {id, m}, map -> Map.put(map, id, m) end)
+ bnew_set = Enum.reduce(bnew_pairs, %MapSet{}, fn {id, _m}, ms -> MapSet.put(ms, id) end)
+ state = MapSet.difference(bnew_set, b)
+ |> Enum.reduce(state, fn idadd, state ->
+ add_dep_link(state, shard_id, idadd, bnew_map[idadd])
+ end)
+ for idrm <- MapSet.difference(b, bnew_set) do
+ rm_dep_link(shard_id, idrm)
+ end
+ :dets.insert(@shard_db, {shard_id, manifest, {reason, a, bnew_set}, shst})
+ {:noreply, state}
+ _ ->
+ {:noreply, state}
+ end
+ end
+
+ def handle_info(:clean_cache, state) do
+ currtime = System.os_time :seconds
+
+ shards = :dets.select(@shard_db, [{
+ {:'$1', :_, {:cached, :'$2'}, :_}, [{:<, :'$2', currtime}], [:'$1']}
+ ])
+ for [id] <- shards do
+ case :ets.lookup(:shard_procs, {id, nil}) do
+ [{{^id, nil}, pid}] ->
+ GenServer.cast(pid, :delete_shard)
+ _ -> nil
+ end
+ :dets.delete(@shard_db, id)
+ end
+
+ Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
+ {:noreply, state}
+ end
+
def handle_info({:DOWN, _, :process, pid, reason}, state) do
handle_info({:EXIT, pid, reason}, state)
end
@@ -114,6 +188,59 @@ defmodule Shard.Manager do
end
end
+ defp find_or_start(state, shard_id, manifest) do
+ case :dets.lookup(@shard_db, shard_id) do
+ [] ->
+ :dets.insert(@shard_db, {shard_id, manifest, cached(), nil})
+ [{^shard_id, ^manifest, {:cached, _}, shst}] ->
+ :dets.insert(@shard_db, {shard_id, manifest, cached(), shst})
+ _ -> nil
+ end
+
+ case :ets.lookup(:shard_procs, {shard_id, nil}) do
+ [] ->
+ {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest])
+ :ets.insert(:shard_procs, {{shard_id, nil}, pid})
+ state = Map.put(state, pid, {shard_id, nil})
+ {pid, state}
+ [{{^shard_id, nil}, pid}] ->
+ {pid, state}
+ end
+ end
+
+ defp cached() do
+ {:cached, System.os_time(:seconds) + @cache_ttl}
+ end
+
+ defp add_dep_link(state, shard_id, id2, m2) do
+ case :dets.lookup(@shard_db, id2) do
+ [{^id2, ^m2, {reason, a, b}, shst}] when reason == :pinned or reason == :req ->
+ :dets.insert(@shard_db, {id2, m2, {reason, MapSet.put(a, shard_id), b}, shst})
+ state
+ _ ->
+ a = MapSet.new() |> MapSet.put(shard_id)
+ :dets.insert(@shard_db, {id2, m2, {:req, a, %MapSet{}}, nil})
+ {pid, state} = find_or_start(state, id2, m2)
+ GenServer.cast(pid, :send_deps)
+ state
+ end
+ end
+
+ defp rm_dep_link(shard_id, id2) do
+ case :dets.lookup(@shard_db, id2) do
+ [{^id2, m2, {reason, a, b}, shst}] when reason == :pinned or reason == :req ->
+ a2 = MapSet.delete(a, shard_id)
+ if reason == :req and MapSet.size(a2) == 0 do
+ :dets.insert(@shard_db, {id2, m2, cached(), shst})
+ for dep <- b do
+ rm_dep_link(id2, dep)
+ end
+ else
+ :dets.insert(@shard_db, {id2, m2, {reason, a2, b}, shst})
+ end
+ end
+ end
+
# ======================
# CALLED BY SNet.TcpConn
@@ -125,7 +252,7 @@ defmodule Shard.Manager do
def incoming(conn_pid, peer_info, auth, {:interested, shards}) do
for shard_id <- shards do
case :dets.lookup(@shard_db, shard_id) do
- [{ ^shard_id, manifest, _ }] ->
+ [{ ^shard_id, manifest, _, _ }] ->
GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do
[] ->
@@ -146,7 +273,7 @@ defmodule Shard.Manager do
case :dets.lookup(@shard_db, shard_id) do
[] ->
GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}})
- [{ ^shard_id, manifest, _}] ->
+ [{ ^shard_id, manifest, _, _}] ->
GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
pid = case :ets.lookup(:shard_procs, {shard_id, path}) do
[] ->
@@ -190,7 +317,7 @@ defmodule Shard.Manager do
"""
def load_state(shard_id) do
case :dets.lookup(@shard_db, shard_id) do
- [{^shard_id, _, state}] -> state
+ [{^shard_id, _, _, state}] -> state
_ -> nil
end
end
@@ -199,10 +326,7 @@ defmodule Shard.Manager do
Save a state value for a shard
"""
def save_state(shard_id, state) do
- case :dets.lookup(@shard_db, shard_id) do
- [{^shard_id, manifest, _old_state}] ->
- :dets.insert(@shard_db, {shard_id, manifest, state})
- end
+ GenServer.cast(__MODULE__, {:save_state, shard_id, state})
end
@@ -234,9 +358,11 @@ defmodule Shard.Manager do
end
@doc"""
- Return the list of all shards.
+ Return the list of all shards. Returns a list of tuples:
+
+ {id, manifest, why_have_it}
"""
def list_shards() do
- for [x] <- :dets.match(@shard_db, :"$1"), do: x
+ for [{id, m, why, _}] <- :dets.match(@shard_db, :"$1"), do: {id, m, why}
end
end