defmodule SApp.PageStore do @moduledoc """ A module that implements a content-adressable storage (pages, identified by the hash of their contents). This is not a shard, it is a side process that a shard may use to store its data. Uses an ETS table of: { page_id, why_have_it } # waiting for data { page_id, why_have_it, data } # once we have the data why_have_it := :root | {:req_by, some_other_page_id} | {:cached, expiry_date} TODO: at the moment we are trying to pull all missing pages at once from our peers. This can work for metadata that isn't too big but won't work with bigger objects. Have a smart strategy where we limit the number of requests currently in-flight but still make sure everything gets pulled in. This will also pave the way to selectively pulling in pages, for instance if we have a function to give them a priority score and a maximum stored page count. A `SApp.PageStore` can be used as a `SData.PageStore` in the following way: %SApp.PageStore{pid: store_pid} or: %SApp.PageStore{pid: store_pid, prefer_ask: [connection_pid, ...]} In the second case, missing pages will be requested first to the specified peers. """ use GenServer @enforce_keys [:pid] defstruct [:pid, :prefer_ask] @cache_ttl 600 # Ten minutes @clean_cache_every 60 # One minute @max_failures 4 # Maximum of peers that reply not_found before we abandon defmodule State do @moduledoc""" Internal state struct of pagestore process. """ defstruct [:shard_id, :path, :netgroup, :store, :reqs, :retries, :store_path] end def start_link(shard_id, path, netgroup) do GenServer.start_link(__MODULE__, [shard_id, path, netgroup]) end def init([shard_id, path, netgroup]) do Shard.Manager.dispatch_to(shard_id, path, self()) store_path = [Application.get_env(:shard, :data_path), "#{shard_id|>Base.encode16}.#{path}"] |> Path.join {:ok, store} = :dets.open_file(String.to_atom(store_path), [type: :set]) Process.send_after(self(), :clean_cache, 1000) {:ok, %State{shard_id: shard_id, path: path, netgroup: netgroup, store: store, reqs: %{}, retries: %{}, store_path: store_path}} end def handle_call(:delete_store, _from, state) do :dets.close File.rm state.store_path {:stop, :normal, :ok, state} end def handle_call({:get, key, prefer_ask}, from, state) do case :dets.lookup, key do [{_, _, bin}] -> {:reply, bin, state} [{_, _}] -> state = add_request(state, key, from) {:noreply, state} [] -> why = {:cached, System.os_time(:seconds) + @cache_ttl} init_rec_pull(state, key, why, prefer_ask) state = add_request(state, key, from) {:noreply, state} end end def handle_call({:put, bin}, _from, state) do hash = SData.bin_hash bin store_put(state, hash, bin) {:reply, hash, state} end def handle_call({:have_rec, root}, _from, state) do {:reply, have_rec(state, root), state} end defp add_request(state, key, from) do reqs_key = case state.reqs[key] do nil -> MapSet.put(, from) ms -> MapSet.put(ms, from) end put_in(state.reqs[key], reqs_key) end defp store_put(state, hash, bin) do case :dets.lookup, hash do [] -> :dets.insert, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin} :dets.sync nil [{_, why}] -> :dets.insert, {hash, why, bin} :dets.sync why [{_, _, _}] -> nil end end defp have_rec(state, root) do case :dets.lookup, root do [{_, _, bin}] -> pg = SData.term_unbin bin pg |> SData.Page.refs |>, &1))) |> Enum.all? _ -> false end end defp init_rec_pull(state, key, why, prefer_ask) do case prefer_ask do [_|_] -> for peer <- prefer_ask do SNet.Manager.send_pid(peer, {state.shard_id, state.path, {:get, key}}) end _ -> ask_random_peers(state, key) end :dets.insert, {key, why} end def handle_cast({:rec_pull, hash, ask_to}, state) do if :dets.lookup, hash == [] do why = {:cached, System.os_time(:seconds) + @cache_ttl} init_rec_pull(state, hash, why, ask_to) end {:noreply, state} end def handle_cast({:msg, conn_pid, auth, _shard_id, _path, msg}, state) do if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do {:noreply, state} else state = case msg do {:get, key} -> case :dets.lookup, key do [{_, _, bin}] -> SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}}) _ -> SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}}) end state {:info, hash, bin} -> already_have_it = case :dets.lookup, hash do [{_, _, _}] -> true _ -> false end if SData.bin_hash(bin) == hash and not already_have_it do reqs = case state.reqs[hash] do nil -> state.reqs pids -> for pid <- pids do GenServer.reply(pid, bin) end Map.delete(state.reqs, hash) end state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)} rec_why = store_put(state, hash, bin) if rec_why != nil do sub_why = case rec_why do {:cached, ttl} -> {:cached, ttl} _ -> {:req_by, hash} end value = SData.term_unbin bin for dep <- SData.Page.refs value do if :dets.lookup, dep == [] do init_rec_pull(state, dep, sub_why, [conn_pid]) end end end state else state end {:not_found, key} -> if state.reqs[key] != nil do nretry = case state.retries[key] do nil -> 1 n -> n+1 end if nretry < @max_failures do ask_random_peers(state, key) %{state | retries: Map.put(state.retries, key, nretry)} else for pid <- state.reqs[key] do GenServer.reply(pid, nil) end state = %{state | reqs: Map.delete(state.reqs, key)} state = %{state | retries: Map.delete(state.retries, key)} state end else state end end {:noreply, state} end end def handle_cast({:set_roots, roots}, state) do cached_why = {:cached, System.os_time(:seconds) + @cache_ttl} # Set old roots and their deps as cached for ent <-, [{ {:"$1", :root, :"$2"}, [], [:"$$"] }, { {:"$1", :root}, [], [:"$$"] }, { {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] }, { {:"$1", {:req_by, :_}}, [], [:"$$"] }] do case ent do [id, bin] -> :dets.insert, {id, cached_why, bin} [id] -> :dets.insert, {id, cached_why} end end # Set new roots as roots for root <- roots do case :dets.lookup, root do [{^root, _, bin}] -> :dets.insert, {root, :root, bin} rec_set_dep(state, root, SData.term_unbin bin) [{^root, _}] -> :dets.insert, {root, :root} [] -> init_rec_pull state, root, :root, [] end end {:noreply, state} end defp rec_set_dep(state, hash, val0) do for dep <- SData.Page.refs val0 do case :dets.lookup, dep do [{^dep, _, bin}] -> :dets.insert, {dep, {:req_by, hash}, bin} rec_set_dep(state, dep, SData.term_unbin bin) [{^dep, _}] -> :dets.insert, {dep, {:req_by, hash}} [] -> init_rec_pull state, dep, {:req_by, hash}, [] end end end def handle_info(:clean_cache, state) do currtime = System.os_time :seconds cache_cleanup = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [true]}, {{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [true]} ] :dets.select_delete(, cache_cleanup) Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) {:noreply, state} end defp ask_random_peers(state, key) do SNet.Group.broadcast(state.netgroup, {state.shard_id, state.path, {:get, key}}, nmax: 3) end defimpl SData.PageStore do def put(store, page) do bin = SData.term_bin page hash =, {:put, bin}) { hash, store } end def get(store, hash) do try do case, {:get, hash, store.prefer_ask}) do nil -> nil bin -> SData.term_unbin bin end catch :exit, {:timeout, _} -> nil end end def copy(store, other_store, hash) do GenServer.cast(, {:rec_pull, hash, other_store.prefer_ask}) store end def free(store, _hash) do store ## DO SOMETHING??? end end # ==================== # PAGE STORE INTERFACE # ==================== @doc""" Returns `true` if the page store currently stores the specified root page and all its dependencies, recursively. """ def have_rec?(pid, root) do, {:have_rec, root}) end @doc""" Define the set of root pages we are interested in. This will start pulling in the defined pages and all their dependencies recursively if we don't have them. """ def set_roots(pid, roots) do GenServer.cast(pid, {:set_roots, roots}) end @doc""" Delete the page store. The process is stopped and the data file is deleted from disk. """ def delete_store(pid) do, :delete_store) end end