defmodule SApp.BlockStore do @moduledoc """ A module that implements a content-adressable storage (blocks, or pages, identified by the hash of their contents). This is not a shard, it is a side process that a shard may use to store its data. TODO: WIP """ use GenServer @enforce_keys [:pid] defstruct [:pid, :prefer_ask] defmodule State do defstruct [:shard_id, :path, :store, :reqs, :retries] end def start_link(shard_id, path) do GenServer.start_link(__MODULE__, [shard_id, path]) end def init([shard_id, path]) do Shard.Manager.dispatch_to(shard_id, path, self()) {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}} end def handle_call({:get, key, prefer_ask}, from, state) do case state.store[key] do nil -> case prefer_ask do [_ | _] -> for peer <- prefer_ask do Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) end _ -> ask_random_peers(state, key) end reqs_key = case state.reqs[key] do nil -> MapSet.put(MapSet.new(), from) ms -> MapSet.put(ms, from) end state = put_in(state.reqs[key], reqs_key) {:noreply, state} v -> {:reply, v, state} end end def handle_call({:put, val}, _from, state) do hash = SData.term_hash val state = %{state | store: Map.put(state.store, hash, val)} {:reply, hash, state} end def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do state = case msg do {:get, key} -> case state.store[key] do nil -> Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) v -> Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}}) end state {:info, hash, value} -> if SData.term_hash value == hash do reqs = case state.reqs[hash] do nil -> state.reqs pids -> for pid <- pids do GenServer.reply(pid, value) end Map.delete(state.reqs, hash) end state = %{state | retries: Map.delete(state.retries, hash)} %{state | store: Map.put(state.store, hash, value), reqs: reqs} else state end {:not_found, key} -> if state.reqs[key] != nil and state.store[key] == nil do nretry = case state.retries[key] do nil -> 1 n -> n+1 end if nretry < 3 do ask_random_peers(state, key) %{state | retries: Map.put(state.retries, key, nretry)} else for pid <- state.reqs[key] do GenServer.reply(pid, nil) end state = %{state | reqs: Map.delete(state.reqs, key)} state = %{state | retries: Map.delete(state.retries, key)} state end else state end end {:noreply, state} end def ask_random_peers(state, key) do peers = :ets.lookup(:shard_peer_db, state.shard_id) |> Enum.shuffle |> Enum.take(3) for peer <- peers do Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) end end defimpl SData.PageStore do def put(store, page) do hash = GenServer.call(store.pid, {:put, page}) { hash, store } end def get(store, hash) do try do GenServer.call(store.pid, {:get, hash, store.prefer_ask}) catch :exit, {:timeout, _} -> nil end end def copy(store, other_store, hash) do page = SData.PageStore.get(other_store, hash) refs = SData.Page.refs(page) for ref <- refs do copy(store, other_store, ref) end GenServer.call(store.pid, {:put, page}) store end def free(store, _hash) do store ## DO SOMETHING??? end end end