diff options
Diffstat (limited to 'shard/lib/app/blockstore.ex')
-rw-r--r-- | shard/lib/app/blockstore.ex | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/blockstore.ex new file mode 100644 index 0000000..8e4fddc --- /dev/null +++ b/shard/lib/app/blockstore.ex @@ -0,0 +1,149 @@ +defmodule SApp.BlockStore do + @moduledoc """ + A module that implements a content-adressable storage (blocks, or pages, + identified by the hash of their contents). + + This is not a shard, it is a side process that a shard may use to store its data. + + TODO: WIP + """ + + use GenServer + + @enforce_keys [:pid] + defstruct [:pid, :prefer_ask] + + + defmodule State do + defstruct [:shard_id, :path, :store, :reqs, :retries] + end + + + def start_link(shard_id, path) do + GenServer.start_link(__MODULE__, [shard_id, path]) + end + + def init([shard_id, path]) do + Shard.Manager.dispatch_to(shard_id, path, self()) + + {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}} + end + + def handle_call({:get, key, prefer_ask}, from, state) do + case state.store[key] do + nil -> + case prefer_ask do + [_ | _] -> + for peer <- prefer_ask do + Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) + end + _ -> + ask_random_peers(state, key) + end + reqs_key = case state.reqs[key] do + nil -> + MapSet.put(MapSet.new(), from) + ms -> + MapSet.put(ms, from) + end + state = put_in(state.reqs[key], reqs_key) + {:noreply, state} + v -> + {:reply, v, state} + end + end + + def handle_call({:put, val}, _from, state) do + hash = SData.term_hash val + state = %{state | store: Map.put(state.store, hash, val)} + {:reply, hash, state} + end + + def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do + state = case msg do + {:get, key} -> + case state.store[key] do + nil -> + Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) + v -> + Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}}) + end + state + {:info, hash, value} -> + if SData.term_hash value == hash do + reqs = case state.reqs[hash] do + nil -> state.reqs + pids -> + for pid <- pids do + GenServer.reply(pid, value) + end + Map.delete(state.reqs, hash) + end + state = %{state | retries: Map.delete(state.retries, hash)} + %{state | store: Map.put(state.store, hash, value), reqs: reqs} + else + state + end + {:not_found, key} -> + if state.reqs[key] != nil and state.store[key] == nil do + nretry = case state.retries[key] do + nil -> 1 + n -> n+1 + end + if nretry < 3 do + ask_random_peers(state, key) + %{state | retries: Map.put(state.retries, key, nretry)} + else + for pid <- state.reqs[key] do + GenServer.reply(pid, nil) + end + state = %{state | reqs: Map.delete(state.reqs, key)} + state = %{state | retries: Map.delete(state.retries, key)} + state + end + else + state + end + end + {:noreply, state} + end + + def ask_random_peers(state, key) do + peers = :ets.lookup(:shard_peer_db, state.shard_id) + |> Enum.shuffle + |> Enum.take(3) + for {_, peer} <- peers do + Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) + end + end + + + defimpl SData.PageStore do + def put(store, page) do + hash = GenServer.call(store.pid, {:put, page}) + { hash, store } + end + + def get(store, hash) do + try do + GenServer.call(store.pid, {:get, hash, store.prefer_ask}) + catch + :exit, {:timeout, _} -> nil + end + end + + def copy(store, other_store, hash) do + page = SData.PageStore.get(other_store, hash) + refs = SData.Page.refs(page) + for ref <- refs do + copy(store, other_store, ref) + end + GenServer.call(store.pid, {:put, page}) + store + end + + def free(store, _hash) do + store ## DO SOMETHING??? + end + end +end |