aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/app/blockstore.ex
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/app/blockstore.ex')
-rw-r--r--shard/lib/app/blockstore.ex149
1 files changed, 149 insertions, 0 deletions
diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/blockstore.ex
new file mode 100644
index 0000000..8e4fddc
--- /dev/null
+++ b/shard/lib/app/blockstore.ex
@@ -0,0 +1,149 @@
+defmodule SApp.BlockStore do
+ @moduledoc """
+ A module that implements a content-adressable storage (blocks, or pages,
+ identified by the hash of their contents).
+
+ This is not a shard, it is a side process that a shard may use to store its data.
+
+ TODO: WIP
+ """
+
+ use GenServer
+
+ @enforce_keys [:pid]
+ defstruct [:pid, :prefer_ask]
+
+
+ defmodule State do
+ defstruct [:shard_id, :path, :store, :reqs, :retries]
+ end
+
+
+ def start_link(shard_id, path) do
+ GenServer.start_link(__MODULE__, [shard_id, path])
+ end
+
+ def init([shard_id, path]) do
+ Shard.Manager.dispatch_to(shard_id, path, self())
+
+ {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}}
+ end
+
+ def handle_call({:get, key, prefer_ask}, from, state) do
+ case state.store[key] do
+ nil ->
+ case prefer_ask do
+ [_ | _] ->
+ for peer <- prefer_ask do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ _ ->
+ ask_random_peers(state, key)
+ end
+ reqs_key = case state.reqs[key] do
+ nil ->
+ MapSet.put(MapSet.new(), from)
+ ms ->
+ MapSet.put(ms, from)
+ end
+ state = put_in(state.reqs[key], reqs_key)
+ {:noreply, state}
+ v ->
+ {:reply, v, state}
+ end
+ end
+
+ def handle_call({:put, val}, _from, state) do
+ hash = SData.term_hash val
+ state = %{state | store: Map.put(state.store, hash, val)}
+ {:reply, hash, state}
+ end
+
+ def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
+ state = case msg do
+ {:get, key} ->
+ case state.store[key] do
+ nil ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
+ v ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
+ end
+ state
+ {:info, hash, value} ->
+ if SData.term_hash value == hash do
+ reqs = case state.reqs[hash] do
+ nil -> state.reqs
+ pids ->
+ for pid <- pids do
+ GenServer.reply(pid, value)
+ end
+ Map.delete(state.reqs, hash)
+ end
+ state = %{state | retries: Map.delete(state.retries, hash)}
+ %{state | store: Map.put(state.store, hash, value), reqs: reqs}
+ else
+ state
+ end
+ {:not_found, key} ->
+ if state.reqs[key] != nil and state.store[key] == nil do
+ nretry = case state.retries[key] do
+ nil -> 1
+ n -> n+1
+ end
+ if nretry < 3 do
+ ask_random_peers(state, key)
+ %{state | retries: Map.put(state.retries, key, nretry)}
+ else
+ for pid <- state.reqs[key] do
+ GenServer.reply(pid, nil)
+ end
+ state = %{state | reqs: Map.delete(state.reqs, key)}
+ state = %{state | retries: Map.delete(state.retries, key)}
+ state
+ end
+ else
+ state
+ end
+ end
+ {:noreply, state}
+ end
+
+ def ask_random_peers(state, key) do
+ peers = :ets.lookup(:shard_peer_db, state.shard_id)
+ |> Enum.shuffle
+ |> Enum.take(3)
+ for {_, peer} <- peers do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ end
+
+
+ defimpl SData.PageStore do
+ def put(store, page) do
+ hash = GenServer.call(store.pid, {:put, page})
+ { hash, store }
+ end
+
+ def get(store, hash) do
+ try do
+ GenServer.call(store.pid, {:get, hash, store.prefer_ask})
+ catch
+ :exit, {:timeout, _} -> nil
+ end
+ end
+
+ def copy(store, other_store, hash) do
+ page = SData.PageStore.get(other_store, hash)
+ refs = SData.Page.refs(page)
+ for ref <- refs do
+ copy(store, other_store, ref)
+ end
+ GenServer.call(store.pid, {:put, page})
+ store
+ end
+
+ def free(store, _hash) do
+ store ## DO SOMETHING???
+ end
+ end
+end