aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/app/pagestore.ex
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/app/pagestore.ex')
-rw-r--r--shard/lib/app/pagestore.ex269
1 files changed, 269 insertions, 0 deletions
diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex
new file mode 100644
index 0000000..5f2ba54
--- /dev/null
+++ b/shard/lib/app/pagestore.ex
@@ -0,0 +1,269 @@
+defmodule SApp.PageStore do
+ @moduledoc """
+ A module that implements a content-adressable storage (pages,
+ identified by the hash of their contents).
+
+ This is not a shard, it is a side process that a shard may use to store its data.
+
+ Uses an ETS table of:
+
+ { page_id, why_have_it } -- waiting for data
+ { page_id, why_have_it, data } -- once we have the data
+
+ why_have_it := :root
+ | {:req_by, some_other_page_id}
+ | {:cached, expiry_date}
+ """
+
+ use GenServer
+
+ @enforce_keys [:pid]
+ defstruct [:pid, :prefer_ask]
+
+ @cache_ttl 600 # Ten minutes
+ @clean_cache_every 60 # One minute
+ @max_failures 4 # Maximum of peers that reply not_found before we abandon
+
+ defmodule State do
+ defstruct [:shard_id, :path, :store, :reqs, :retries]
+ end
+
+
+ def start_link(shard_id, path) do
+ GenServer.start_link(__MODULE__, [shard_id, path])
+ end
+
+ def init([shard_id, path]) do
+ Shard.Manager.dispatch_to(shard_id, path, self())
+
+ store_path = String.to_atom "#{Base.encode16 shard_id}/#{Atom.to_string path}"
+ store = :ets.new store_path, [:set, :protected]
+
+ Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
+
+ {:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}}
+ end
+
+
+ def handle_call({:get, key, prefer_ask}, from, state) do
+ case :ets.lookup state.store, key do
+ [{_, _, bin}] ->
+ {:reply, bin, state}
+ [{_, _}] ->
+ state = add_request(state, key, from)
+ {:noreply, state}
+ [] ->
+ why = {:cached, System.os_time(:seconds) + @cache_ttl}
+ init_rec_pull(state, key, why, prefer_ask)
+ state = add_request(state, key, from)
+ {:noreply, state}
+ end
+ end
+
+ def handle_call({:put, bin}, _from, state) do
+ hash = SData.bin_hash bin
+ store_put(state, hash, bin)
+ {:reply, hash, state}
+ end
+
+ defp add_request(state, key, from) do
+ reqs_key = case state.reqs[key] do
+ nil ->
+ MapSet.put(MapSet.new(), from)
+ ms ->
+ MapSet.put(ms, from)
+ end
+ put_in(state.reqs[key], reqs_key)
+ end
+
+ defp store_put(state, hash, bin) do
+ case :ets.lookup state.store, hash do
+ [] ->
+ :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin}
+ nil
+ [{_, why}] ->
+ :ets.insert state.store, {hash, why, bin}
+ why
+ [{_, _, _}] ->
+ nil
+ end
+ end
+
+ defp init_rec_pull(state, key, why, prefer_ask) do
+ case prefer_ask do
+ [] ->
+ ask_random_peers(state, key)
+ _ ->
+ for peer <- prefer_ask do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ end
+ :ets.insert state.store, {key, why}
+ end
+
+ def handle_cast({:rec_pull, hash, ask_to}, state) do
+ if :ets.lookup state.store, hash == [] do
+ why = {:cached, System.os_time(:seconds) + @cache_ttl}
+ init_rec_pull(state, hash, why, ask_to)
+ end
+ {:noreply, state}
+ end
+
+ def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
+ state = case msg do
+ {:get, key} ->
+ case :ets.lookup state.store, key do
+ [{_, _, bin}] ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, bin}})
+ _ ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
+ end
+ state
+ {:info, hash, bin} ->
+ already_have_it = case :ets.lookup state.store, hash do
+ [{_, _, _}] -> true
+ _ -> false
+ end
+ if SData.bin_hash bin == hash and not already_have_it do
+ reqs = case state.reqs[hash] do
+ nil -> state.reqs
+ pids ->
+ for pid <- pids do
+ GenServer.reply(pid, bin)
+ end
+ Map.delete(state.reqs, hash)
+ end
+ state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)}
+ rec_why = store_put(state, hash, bin)
+ if rec_why != nil do
+ sub_why = case rec_why do
+ {:cached, ttl} -> {:cached, ttl}
+ _ -> {:req_by, hash}
+ end
+ value = SData.term_unbin bin
+ for dep <- SData.Page.refs value do
+ if :ets.lookup state.store, dep == [] do
+ init_rec_pull(state, dep, sub_why, [peer_id])
+ end
+ end
+ end
+ state
+ else
+ state
+ end
+ {:not_found, key} ->
+ if state.reqs[key] != nil do
+ nretry = case state.retries[key] do
+ nil -> 1
+ n -> n+1
+ end
+ if nretry < @max_failures do
+ ask_random_peers(state, key)
+ %{state | retries: Map.put(state.retries, key, nretry)}
+ else
+ for pid <- state.reqs[key] do
+ GenServer.reply(pid, nil)
+ end
+ state = %{state | reqs: Map.delete(state.reqs, key)}
+ state = %{state | retries: Map.delete(state.retries, key)}
+ state
+ end
+ else
+ state
+ end
+ end
+ {:noreply, state}
+ end
+
+ def handle_cast({:set_roots, roots}, state) do
+ cached_why = {:cached, System.os_time(:seconds) + @cache_ttl}
+
+ # Set old roots and their deps as cached
+ for ent <- :ets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] },
+ { {:"$1", :root}, [], [:"$$"] },
+ { {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] },
+ { {:"$1", {:req_by, :_}}, [], [:"$$"] }]
+ do
+ case ent do
+ [id, bin] ->
+ :ets.insert state.store, {id, cached_why, bin}
+ [id] ->
+ :ets.insert state.store, {id, cached_why}
+ end
+ end
+
+ # Set new roots as roots
+ for root <- roots do
+ case :ets.lookup state.store, root do
+ [{^root, _, bin}] ->
+ :ets.insert state.store, {root, :root, bin}
+ rec_set_dep(state, root, SData.term_unbin bin)
+ [{^root, _}] ->
+ :ets.insert state.store, {root, :root}
+ [] ->
+ init_rec_pull state, root, :root, []
+ end
+ end
+ {:noreply, state}
+ end
+
+ defp rec_set_dep(state, hash, val0) do
+ for dep <- SData.Page.refs val0 do
+ case :ets.lookup state.store, dep do
+ [{^dep, _, bin}] ->
+ :ets.insert state.store, {dep, {:req_by, hash}, bin}
+ rec_set_dep(state, dep, SData.term_unbin bin)
+ [{^dep, _}] ->
+ :ets.insert state.store, {dep, {:req_by, hash}}
+ [] ->
+ init_rec_pull state, dep, {:req_by, hash}, []
+ end
+ end
+ end
+
+ def handle_info(:clean_cache, state) do
+ currtime = System.os_time :seconds
+
+ cache_cleanup = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [true]},
+ {{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [true]} ]
+ :ets.select_delete(state.store, cache_cleanup)
+
+ Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
+ {:noreply, state}
+ end
+
+ def ask_random_peers(state, key) do
+ peers = Shard.Manager.get_shard_peers(state.shard_id)
+ |> Enum.shuffle
+ |> Enum.take(3)
+ for peer <- peers do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ end
+
+ defimpl SData.PageStore do
+ def put(store, page) do
+ bin = SData.term_bin page
+ hash = GenServer.call(store.pid, {:put, bin})
+ { hash, store }
+ end
+
+ def get(store, hash) do
+ try do
+ bin = GenServer.call(store.pid, {:get, hash, store.prefer_ask})
+ SData.term_unbin bin
+ catch
+ :exit, {:timeout, _} -> nil
+ end
+ end
+
+ def copy(store, other_store, hash) do
+ GenServer.cast(store.pid, {:rec_pull, hash, other_store.prefer_ask})
+ store
+ end
+
+ def free(store, _hash) do
+ store ## DO SOMETHING???
+ end
+ end
+end