defmodule SApp.BlockStore do
@moduledoc """
A module that implements a content-adressable storage (blocks, or pages,
identified by the hash of their contents).
This is not a shard, it is a side process that a shard may use to store its data.
TODO: WIP
"""
use GenServer
@enforce_keys [:pid]
defstruct [:pid, :prefer_ask]
defmodule State do
defstruct [:shard_id, :path, :store, :reqs, :retries]
end
def start_link(shard_id, path) do
GenServer.start_link(__MODULE__, [shard_id, path])
end
def init([shard_id, path]) do
Shard.Manager.dispatch_to(shard_id, path, self())
{:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}}
end
def handle_call({:get, key, prefer_ask}, from, state) do
case state.store[key] do
nil ->
case prefer_ask do
[_ | _] ->
for peer <- prefer_ask do
Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
end
_ ->
ask_random_peers(state, key)
end
reqs_key = case state.reqs[key] do
nil ->
MapSet.put(MapSet.new(), from)
ms ->
MapSet.put(ms, from)
end
state = put_in(state.reqs[key], reqs_key)
{:noreply, state}
v ->
{:reply, v, state}
end
end
def handle_call({:put, val}, _from, state) do
hash = SData.term_hash val
state = %{state | store: Map.put(state.store, hash, val)}
{:reply, hash, state}
end
def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
state = case msg do
{:get, key} ->
case state.store[key] do
nil ->
Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
v ->
Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
end
state
{:info, hash, value} ->
if SData.term_hash value == hash do
reqs = case state.reqs[hash] do
nil -> state.reqs
pids ->
for pid <- pids do
GenServer.reply(pid, value)
end
Map.delete(state.reqs, hash)
end
state = %{state | retries: Map.delete(state.retries, hash)}
%{state | store: Map.put(state.store, hash, value), reqs: reqs}
else
state
end
{:not_found, key} ->
if state.reqs[key] != nil and state.store[key] == nil do
nretry = case state.retries[key] do
nil -> 1
n -> n+1
end
if nretry < 3 do
ask_random_peers(state, key)
%{state | retries: Map.put(state.retries, key, nretry)}
else
for pid <- state.reqs[key] do
GenServer.reply(pid, nil)
end
state = %{state | reqs: Map.delete(state.reqs, key)}
state = %{state | retries: Map.delete(state.retries, key)}
state
end
else
state
end
end
{:noreply, state}
end
def ask_random_peers(state, key) do
peers = Shard.Manager.get_shard_peers(state.shard_id)
|> Enum.shuffle
|> Enum.take(3)
for peer <- peers do
Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
end
end
defimpl SData.PageStore do
def put(store, page) do
hash = GenServer.call(store.pid, {:put, page})
{ hash, store }
end
def get(store, hash) do
try do
GenServer.call(store.pid, {:get, hash, store.prefer_ask})
catch
:exit, {:timeout, _} -> nil
end
end
def copy(store, other_store, hash) do
page = SData.PageStore.get(other_store, hash)
refs = SData.Page.refs(page)
for ref <- refs do
copy(store, other_store, ref)
end
GenServer.call(store.pid, {:put, page})
store
end
def free(store, _hash) do
store ## DO SOMETHING???
end
end
end