defmodule SApp.PageStore do
@moduledoc """
A module that implements a content-adressable storage (pages,
identified by the hash of their contents).
This is not a shard, it is a side process that a shard may use to store its data.
Uses an ETS table of:
{ page_id, why_have_it } -- waiting for data
{ page_id, why_have_it, data } -- once we have the data
why_have_it := :root
| {:req_by, some_other_page_id}
| {:cached, expiry_date}
"""
use GenServer
@enforce_keys [:pid]
defstruct [:pid, :prefer_ask]
@cache_ttl 600 # Ten minutes
@clean_cache_every 60 # One minute
@max_failures 4 # Maximum of peers that reply not_found before we abandon
defmodule State do
defstruct [:shard_id, :path, :netgroup, :store, :reqs, :retries]
end
def start_link(shard_id, path, netgroup) do
GenServer.start_link(__MODULE__, [shard_id, path, netgroup])
end
def init([shard_id, path, netgroup]) do
Shard.Manager.dispatch_to(shard_id, path, self())
store_path = [Application.get_env(:shard, :data_path), "#{shard_id|>Base.encode16}.#{path}"] |> Path.join |> String.to_atom
{:ok, store} = :dets.open_file store_path, [type: :set]
Process.send_after(self(), :clean_cache, 1000)
{:ok, %State{shard_id: shard_id, path: path, netgroup: netgroup, store: store, reqs: %{}, retries: %{}}}
end
def handle_call({:get, key, prefer_ask}, from, state) do
case :dets.lookup state.store, key do
[{_, _, bin}] ->
{:reply, bin, state}
[{_, _}] ->
state = add_request(state, key, from)
{:noreply, state}
[] ->
why = {:cached, System.os_time(:seconds) + @cache_ttl}
init_rec_pull(state, key, why, prefer_ask)
state = add_request(state, key, from)
{:noreply, state}
end
end
def handle_call({:put, bin}, _from, state) do
hash = SData.bin_hash bin
store_put(state, hash, bin)
{:reply, hash, state}
end
def handle_call({:have_rec, root}, _from, state) do
{:reply, have_rec(state, root), state}
end
defp add_request(state, key, from) do
reqs_key = case state.reqs[key] do
nil ->
MapSet.put(MapSet.new(), from)
ms ->
MapSet.put(ms, from)
end
put_in(state.reqs[key], reqs_key)
end
defp store_put(state, hash, bin) do
case :dets.lookup state.store, hash do
[] ->
:dets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin}
nil
[{_, why}] ->
:dets.insert state.store, {hash, why, bin}
why
[{_, _, _}] ->
nil
end
end
defp have_rec(state, root) do
case :dets.lookup state.store, root do
[{_, _, bin}] ->
pg = SData.term_unbin bin
pg |> SData.Page.refs |> Enum.map(&(have_rec(state, &1))) |> Enum.all?
_ ->
false
end
end
defp init_rec_pull(state, key, why, prefer_ask) do
case prefer_ask do
[_|_] ->
for peer <- prefer_ask do
SNet.Manager.send_pid(peer, {state.shard_id, state.path, {:get, key}})
end
_ ->
ask_random_peers(state, key)
end
:dets.insert state.store, {key, why}
end
def handle_cast({:rec_pull, hash, ask_to}, state) do
if :dets.lookup state.store, hash == [] do
why = {:cached, System.os_time(:seconds) + @cache_ttl}
init_rec_pull(state, hash, why, ask_to)
end
{:noreply, state}
end
def handle_cast({:msg, conn_pid, auth, _shard_id, _path, msg}, state) do
if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
state
else
state = case msg do
{:get, key} ->
case :dets.lookup state.store, key do
[{_, _, bin}] ->
SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}})
_ ->
SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}})
end
state
{:info, hash, bin} ->
already_have_it = case :dets.lookup state.store, hash do
[{_, _, _}] -> true
_ -> false
end
if SData.bin_hash(bin) == hash and not already_have_it do
reqs = case state.reqs[hash] do
nil -> state.reqs
pids ->
for pid <- pids do
GenServer.reply(pid, bin)
end
Map.delete(state.reqs, hash)
end
state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)}
rec_why = store_put(state, hash, bin)
if rec_why != nil do
sub_why = case rec_why do
{:cached, ttl} -> {:cached, ttl}
_ -> {:req_by, hash}
end
value = SData.term_unbin bin
for dep <- SData.Page.refs value do
if :dets.lookup state.store, dep == [] do
init_rec_pull(state, dep, sub_why, [conn_pid])
end
end
end
state
else
state
end
{:not_found, key} ->
if state.reqs[key] != nil do
nretry = case state.retries[key] do
nil -> 1
n -> n+1
end
if nretry < @max_failures do
ask_random_peers(state, key)
%{state | retries: Map.put(state.retries, key, nretry)}
else
for pid <- state.reqs[key] do
GenServer.reply(pid, nil)
end
state = %{state | reqs: Map.delete(state.reqs, key)}
state = %{state | retries: Map.delete(state.retries, key)}
state
end
else
state
end
end
{:noreply, state}
end
end
def handle_cast({:set_roots, roots}, state) do
cached_why = {:cached, System.os_time(:seconds) + @cache_ttl}
# Set old roots and their deps as cached
for ent <- :dets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] },
{ {:"$1", :root}, [], [:"$$"] },
{ {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] },
{ {:"$1", {:req_by, :_}}, [], [:"$$"] }]
do
case ent do
[id, bin] ->
:dets.insert state.store, {id, cached_why, bin}
[id] ->
:dets.insert state.store, {id, cached_why}
end
end
# Set new roots as roots
for root <- roots do
case :dets.lookup state.store, root do
[{^root, _, bin}] ->
:dets.insert state.store, {root, :root, bin}
rec_set_dep(state, root, SData.term_unbin bin)
[{^root, _}] ->
:dets.insert state.store, {root, :root}
[] ->
init_rec_pull state, root, :root, []
end
end
{:noreply, state}
end
defp rec_set_dep(state, hash, val0) do
for dep <- SData.Page.refs val0 do
case :dets.lookup state.store, dep do
[{^dep, _, bin}] ->
:dets.insert state.store, {dep, {:req_by, hash}, bin}
rec_set_dep(state, dep, SData.term_unbin bin)
[{^dep, _}] ->
:dets.insert state.store, {dep, {:req_by, hash}}
[] ->
init_rec_pull state, dep, {:req_by, hash}, []
end
end
end
def handle_info(:clean_cache, state) do
currtime = System.os_time :seconds
cache_cleanup = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [true]},
{{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [true]} ]
:dets.select_delete(state.store, cache_cleanup)
Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
{:noreply, state}
end
def ask_random_peers(state, key) do
SNet.Group.broadcast(state.netgroup, {state.shard_id, state.path, {:get, key}}, 3)
end
defimpl SData.PageStore do
def put(store, page) do
bin = SData.term_bin page
hash = GenServer.call(store.pid, {:put, bin})
{ hash, store }
end
def get(store, hash) do
try do
case GenServer.call(store.pid, {:get, hash, store.prefer_ask}) do
nil -> nil
bin -> SData.term_unbin bin
end
catch
:exit, {:timeout, _} -> nil
end
end
def copy(store, other_store, hash) do
GenServer.cast(store.pid, {:rec_pull, hash, other_store.prefer_ask})
store
end
def free(store, _hash) do
store ## DO SOMETHING???
end
end
end