defmodule SApp.PageStore do
@moduledoc """
A module that implements a content-adressable storage (pages,
identified by the hash of their contents).
This is not a shard, it is a side process that a shard may use to store its data.
Uses an ETS table of:
{ page_id, why_have_it } # waiting for data
{ page_id, why_have_it, data } # once we have the data
why_have_it := :root
| {:req_by, some_other_page_id}
| {:cached, expiry_date}
TODO: at the moment we are trying to pull all missing pages at once from our peers.
This can work for metadata that isn't too big but won't work with bigger objects.
Have a smart strategy where we limit the number of requests currently in-flight but
still make sure everything gets pulled in. This will also pave the way to selectively
pulling in pages, for instance if we have a function to give them a priority score and
a maximum stored page count.
A `SApp.PageStore` can be used as a `SData.PageStore` in the following way:
%SApp.PageStore{pid: store_pid}
or:
%SApp.PageStore{pid: store_pid, prefer_ask: [connection_pid, ...]}
In the second case, missing pages will be requested first to the specified peers.
"""
use GenServer
@enforce_keys [:pid]
defstruct [:pid, :prefer_ask]
@cache_ttl 600 # Ten minutes
@clean_cache_every 60 # One minute
@max_failures 4 # Maximum of peers that reply not_found before we abandon
defmodule State do
@moduledoc"""
Internal state struct of pagestore process.
"""
defstruct [:shard_id, :path, :netgroup, :store, :reqs, :retries, :store_path]
end
def start_link(shard_id, path, netgroup) do
GenServer.start_link(__MODULE__, [shard_id, path, netgroup])
end
def init([shard_id, path, netgroup]) do
Shard.Manager.dispatch_to(shard_id, path, self())
store_path = [Application.get_env(:shard, :data_path), "#{shard_id|>Base.encode16}.#{path}"] |> Path.join
{:ok, store} = :dets.open_file(String.to_atom(store_path), [type: :set])
Process.send_after(self(), :clean_cache, 1000)
{:ok, %State{shard_id: shard_id, path: path, netgroup: netgroup, store: store, reqs: %{}, retries: %{}, store_path: store_path}}
end
def handle_call(:delete_store, _from, state) do
:dets.close state.store
File.rm state.store_path
{:stop, :normal, :ok, state}
end
def handle_call({:get, key, prefer_ask}, from, state) do
case :dets.lookup state.store, key do
[{_, _, bin}] ->
{:reply, bin, state}
[{_, _}] ->
state = add_request(state, key, from)
{:noreply, state}
[] ->
why = {:cached, System.os_time(:seconds) + @cache_ttl}
init_rec_pull(state, key, why, prefer_ask)
state = add_request(state, key, from)
{:noreply, state}
end
end
def handle_call({:put, bin}, _from, state) do
hash = SData.bin_hash bin
store_put(state, hash, bin)
{:reply, hash, state}
end
def handle_call({:have_rec, root}, _from, state) do
{:reply, have_rec(state, root), state}
end
defp add_request(state, key, from) do
reqs_key = case state.reqs[key] do
nil ->
MapSet.put(MapSet.new(), from)
ms ->
MapSet.put(ms, from)
end
put_in(state.reqs[key], reqs_key)
end
defp store_put(state, hash, bin) do
case :dets.lookup state.store, hash do
[] ->
:dets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin}
:dets.sync state.store
nil
[{_, why}] ->
:dets.insert state.store, {hash, why, bin}
:dets.sync state.store
why
[{_, _, _}] ->
nil
end
end
defp have_rec(state, root) do
case :dets.lookup state.store, root do
[{_, _, bin}] ->
pg = SData.term_unbin bin
pg |> SData.Page.refs |> Enum.map(&(have_rec(state, &1))) |> Enum.all?
_ ->
false
end
end
defp init_rec_pull(state, key, why, prefer_ask) do
case prefer_ask do
[_|_] ->
for peer <- prefer_ask do
SNet.Manager.send_pid(peer, {state.shard_id, state.path, {:get, key}})
end
_ ->
ask_random_peers(state, key)
end
:dets.insert state.store, {key, why}
end
def handle_cast({:rec_pull, hash, ask_to}, state) do
if :dets.lookup(state.store, hash) == [] do
why = {:cached, System.os_time(:seconds) + @cache_ttl}
init_rec_pull(state, hash, why, ask_to)
end
{:noreply, state}
end
def handle_cast({:msg, conn_pid, auth, _shard_id, _path, msg}, state) do
if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
{:noreply, state}
else
state = case msg do
{:get, key} ->
case :dets.lookup state.store, key do
[{_, _, bin}] ->
SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}})
_ ->
SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}})
end
state
{:info, hash, bin} ->
already_have_it = case :dets.lookup state.store, hash do
[{_, _, _}] -> true
_ -> false
end
if SData.bin_hash(bin) == hash and not already_have_it do
reqs = case state.reqs[hash] do
nil -> state.reqs
pids ->
for pid <- pids do
GenServer.reply(pid, bin)
end
Map.delete(state.reqs, hash)
end
state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)}
rec_why = store_put(state, hash, bin)
if rec_why != nil do
sub_why = case rec_why do
{:cached, ttl} -> {:cached, ttl}
_ -> {:req_by, hash}
end
value = SData.term_unbin bin
for dep <- SData.Page.refs value do
if :dets.lookup(state.store, dep) == [] do
init_rec_pull(state, dep, sub_why, [conn_pid])
end
end
end
state
else
state
end
{:not_found, key} ->
if state.reqs[key] != nil do
nretry = case state.retries[key] do
nil -> 1
n -> n+1
end
if nretry < @max_failures do
ask_random_peers(state, key)
%{state | retries: Map.put(state.retries, key, nretry)}
else
for pid <- state.reqs[key] do
GenServer.reply(pid, nil)
end
state = %{state | reqs: Map.delete(state.reqs, key)}
state = %{state | retries: Map.delete(state.retries, key)}
state
end
else
state
end
end
{:noreply, state}
end
end
def handle_cast({:set_roots, roots}, state) do
cached_why = {:cached, System.os_time(:seconds) + @cache_ttl}
# Set old roots and their deps as cached
for ent <- :dets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] },
{ {:"$1", :root}, [], [:"$$"] },
{ {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] },
{ {:"$1", {:req_by, :_}}, [], [:"$$"] }]
do
case ent do
[id, bin] ->
:dets.insert state.store, {id, cached_why, bin}
[id] ->
:dets.insert state.store, {id, cached_why}
end
end
# Set new roots as roots
for root <- roots do
case :dets.lookup state.store, root do
[{^root, _, bin}] ->
:dets.insert state.store, {root, :root, bin}
rec_set_dep(state, root, SData.term_unbin bin)
[{^root, _}] ->
:dets.insert state.store, {root, :root}
[] ->
init_rec_pull state, root, :root, []
end
end
{:noreply, state}
end
defp rec_set_dep(state, hash, val0) do
for dep <- SData.Page.refs val0 do
case :dets.lookup state.store, dep do
[{^dep, _, bin}] ->
:dets.insert state.store, {dep, {:req_by, hash}, bin}
rec_set_dep(state, dep, SData.term_unbin bin)
[{^dep, _}] ->
:dets.insert state.store, {dep, {:req_by, hash}}
[] ->
init_rec_pull state, dep, {:req_by, hash}, []
end
end
end
def handle_info(:clean_cache, state) do
currtime = System.os_time :seconds
cache_cleanup = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [true]},
{{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [true]} ]
:dets.select_delete(state.store, cache_cleanup)
Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
{:noreply, state}
end
defp ask_random_peers(state, key) do
SNet.Group.broadcast(state.netgroup, {state.shard_id, state.path, {:get, key}}, nmax: 3)
end
defimpl SData.PageStore do
def put(store, page) do
bin = SData.term_bin page
hash = GenServer.call(store.pid, {:put, bin})
{ hash, store }
end
def get(store, hash) do
try do
case GenServer.call(store.pid, {:get, hash, store.prefer_ask}) do
nil -> nil
bin -> SData.term_unbin bin
end
catch
:exit, {:timeout, _} -> nil
end
end
def copy(store, other_store, hash) do
GenServer.cast(store.pid, {:rec_pull, hash, other_store.prefer_ask})
store
end
def free(store, _hash) do
store ## DO SOMETHING???
end
end
# ====================
# PAGE STORE INTERFACE
# ====================
@doc"""
Returns `true` if the page store currently stores the specified root page
and all its dependencies, recursively.
"""
def have_rec?(pid, root) do
GenServer.call(pid, {:have_rec, root})
end
@doc"""
Define the set of root pages we are interested in. This will start pulling in
the defined pages and all their dependencies recursively if we don't have them.
"""
def set_roots(pid, roots) do
GenServer.cast(pid, {:set_roots, roots})
end
@doc"""
Delete the page store. The process is stopped and the data file is deleted from disk.
"""
def delete_store(pid) do
GenServer.call(pid, :delete_store)
end
end