defmodule SApp.PageStore do @moduledoc """ A module that implements a content-adressable storage (pages, identified by the hash of their contents). This is not a shard, it is a side process that a shard may use to store its data. Uses an ETS table of: { page_id, why_have_it } -- waiting for data { page_id, why_have_it, data } -- once we have the data why_have_it := :root | {:req_by, some_other_page_id} | {:cached, expiry_date} """ use GenServer @enforce_keys [:pid] defstruct [:pid, :prefer_ask] @cache_ttl 600 # Ten minutes @clean_cache_every 60 # One minute @max_failures 4 # Maximum of peers that reply not_found before we abandon defmodule State do defstruct [:shard_id, :path, :netgroup, :store, :reqs, :retries] end def start_link(shard_id, path, netgroup) do GenServer.start_link(__MODULE__, [shard_id, path, netgroup]) end def init([shard_id, path, netgroup]) do Shard.Manager.dispatch_to(shard_id, path, self()) store_path = [Application.get_env(:shard, :data_path), "#{shard_id|>Base.encode16}.#{path}"] |> Path.join |> String.to_atom {:ok, store} = :dets.open_file store_path, [type: :set] Process.send_after(self(), :clean_cache, 1000) {:ok, %State{shard_id: shard_id, path: path, netgroup: netgroup, store: store, reqs: %{}, retries: %{}}} end def handle_call({:get, key, prefer_ask}, from, state) do case :dets.lookup state.store, key do [{_, _, bin}] -> {:reply, bin, state} [{_, _}] -> state = add_request(state, key, from) {:noreply, state} [] -> why = {:cached, System.os_time(:seconds) + @cache_ttl} init_rec_pull(state, key, why, prefer_ask) state = add_request(state, key, from) {:noreply, state} end end def handle_call({:put, bin}, _from, state) do hash = SData.bin_hash bin store_put(state, hash, bin) {:reply, hash, state} end def handle_call({:have_rec, root}, _from, state) do {:reply, have_rec(state, root), state} end defp add_request(state, key, from) do reqs_key = case state.reqs[key] do nil -> MapSet.put(MapSet.new(), from) ms -> MapSet.put(ms, from) end put_in(state.reqs[key], reqs_key) end defp store_put(state, hash, bin) do case :dets.lookup state.store, hash do [] -> :dets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin} nil [{_, why}] -> :dets.insert state.store, {hash, why, bin} why [{_, _, _}] -> nil end end defp have_rec(state, root) do case :dets.lookup state.store, root do [{_, _, bin}] -> pg = SData.term_unbin bin pg |> SData.Page.refs |> Enum.map(&(have_rec(state, &1))) |> Enum.all? _ -> false end end defp init_rec_pull(state, key, why, prefer_ask) do case prefer_ask do [_|_] -> for peer <- prefer_ask do SNet.Manager.send_pid(peer, {state.shard_id, state.path, {:get, key}}) end _ -> ask_random_peers(state, key) end :dets.insert state.store, {key, why} end def handle_cast({:rec_pull, hash, ask_to}, state) do if :dets.lookup state.store, hash == [] do why = {:cached, System.os_time(:seconds) + @cache_ttl} init_rec_pull(state, hash, why, ask_to) end {:noreply, state} end def handle_cast({:msg, conn_pid, auth, _shard_id, _path, msg}, state) do if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do state else state = case msg do {:get, key} -> case :dets.lookup state.store, key do [{_, _, bin}] -> SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}}) _ -> SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}}) end state {:info, hash, bin} -> already_have_it = case :dets.lookup state.store, hash do [{_, _, _}] -> true _ -> false end if SData.bin_hash(bin) == hash and not already_have_it do reqs = case state.reqs[hash] do nil -> state.reqs pids -> for pid <- pids do GenServer.reply(pid, bin) end Map.delete(state.reqs, hash) end state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)} rec_why = store_put(state, hash, bin) if rec_why != nil do sub_why = case rec_why do {:cached, ttl} -> {:cached, ttl} _ -> {:req_by, hash} end value = SData.term_unbin bin for dep <- SData.Page.refs value do if :dets.lookup state.store, dep == [] do init_rec_pull(state, dep, sub_why, [conn_pid]) end end end state else state end {:not_found, key} -> if state.reqs[key] != nil do nretry = case state.retries[key] do nil -> 1 n -> n+1 end if nretry < @max_failures do ask_random_peers(state, key) %{state | retries: Map.put(state.retries, key, nretry)} else for pid <- state.reqs[key] do GenServer.reply(pid, nil) end state = %{state | reqs: Map.delete(state.reqs, key)} state = %{state | retries: Map.delete(state.retries, key)} state end else state end end {:noreply, state} end end def handle_cast({:set_roots, roots}, state) do cached_why = {:cached, System.os_time(:seconds) + @cache_ttl} # Set old roots and their deps as cached for ent <- :dets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] }, { {:"$1", :root}, [], [:"$$"] }, { {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] }, { {:"$1", {:req_by, :_}}, [], [:"$$"] }] do case ent do [id, bin] -> :dets.insert state.store, {id, cached_why, bin} [id] -> :dets.insert state.store, {id, cached_why} end end # Set new roots as roots for root <- roots do case :dets.lookup state.store, root do [{^root, _, bin}] -> :dets.insert state.store, {root, :root, bin} rec_set_dep(state, root, SData.term_unbin bin) [{^root, _}] -> :dets.insert state.store, {root, :root} [] -> init_rec_pull state, root, :root, [] end end {:noreply, state} end defp rec_set_dep(state, hash, val0) do for dep <- SData.Page.refs val0 do case :dets.lookup state.store, dep do [{^dep, _, bin}] -> :dets.insert state.store, {dep, {:req_by, hash}, bin} rec_set_dep(state, dep, SData.term_unbin bin) [{^dep, _}] -> :dets.insert state.store, {dep, {:req_by, hash}} [] -> init_rec_pull state, dep, {:req_by, hash}, [] end end end def handle_info(:clean_cache, state) do currtime = System.os_time :seconds cache_cleanup = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [true]}, {{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [true]} ] :dets.select_delete(state.store, cache_cleanup) Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) {:noreply, state} end def ask_random_peers(state, key) do SNet.Group.broadcast(state.netgroup, {state.shard_id, state.path, {:get, key}}, nmax: 3) end defimpl SData.PageStore do def put(store, page) do bin = SData.term_bin page hash = GenServer.call(store.pid, {:put, bin}) { hash, store } end def get(store, hash) do try do case GenServer.call(store.pid, {:get, hash, store.prefer_ask}) do nil -> nil bin -> SData.term_unbin bin end catch :exit, {:timeout, _} -> nil end end def copy(store, other_store, hash) do GenServer.cast(store.pid, {:rec_pull, hash, other_store.prefer_ask}) store end def free(store, _hash) do store ## DO SOMETHING??? end end end