aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/app/blockstore.ex
blob: 5e9313566e1f184ab589bf75c2710fcb51f9f143 (plain) (tree)














































































































                                                                                        
                                                         

                     
                        











                                                                         




                                                                 
















                                                   
defmodule SApp.BlockStore do
  @moduledoc """
  A module that implements a content-adressable storage (blocks, or pages,
  identified by the hash of their contents).

  This is not a shard, it is a side process that a shard may use to store its data.

  TODO: WIP
  """

  use GenServer

  @enforce_keys [:pid]
  defstruct [:pid, :prefer_ask]


  defmodule State do
    defstruct [:shard_id, :path, :store, :reqs, :retries]
  end


  def start_link(shard_id, path) do
    GenServer.start_link(__MODULE__, [shard_id, path])
  end

  def init([shard_id, path]) do
    Shard.Manager.dispatch_to(shard_id, path, self())

    {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}}
  end

  def handle_call({:get, key, prefer_ask}, from, state) do
    case state.store[key] do
      nil ->
        case prefer_ask do
          [_ | _] ->
            for peer <- prefer_ask do
              Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
            end
          _ ->
            ask_random_peers(state, key)
        end
        reqs_key = case state.reqs[key] do
          nil ->
            MapSet.put(MapSet.new(), from)
          ms ->
            MapSet.put(ms, from)
        end
        state = put_in(state.reqs[key], reqs_key)
        {:noreply, state}
      v ->
        {:reply, v, state}
    end
  end

  def handle_call({:put, val}, _from, state) do
    hash = SData.term_hash val
    state = %{state | store: Map.put(state.store, hash, val)}
    {:reply, hash, state}
  end

  def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
    state = case msg do
      {:get, key} ->
        case state.store[key] do
          nil ->
            Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
          v ->
            Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
        end
        state
      {:info, hash, value} ->
        if SData.term_hash value == hash do
          reqs = case state.reqs[hash] do
            nil -> state.reqs
            pids ->
              for pid <- pids do
                GenServer.reply(pid, value)
              end
              Map.delete(state.reqs, hash)
          end
          state = %{state | retries: Map.delete(state.retries, hash)}
          %{state | store: Map.put(state.store, hash, value), reqs: reqs}
        else
          state
        end
      {:not_found, key} ->
        if state.reqs[key] != nil and state.store[key] == nil do
          nretry = case state.retries[key] do
            nil -> 1
            n -> n+1
          end
          if nretry < 3 do
            ask_random_peers(state, key)
            %{state | retries: Map.put(state.retries, key, nretry)}
          else
            for pid <- state.reqs[key] do
              GenServer.reply(pid, nil)
            end
            state = %{state | reqs: Map.delete(state.reqs, key)}
            state = %{state | retries: Map.delete(state.retries, key)}
            state
          end
        else
          state
        end
    end
    {:noreply, state}
  end

  def ask_random_peers(state, key) do
    peers = Shard.Manager.get_shard_peers(state.shard_id)
      |> Enum.shuffle
      |> Enum.take(3)
    for peer <- peers do
      Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
    end
  end


  defimpl SData.PageStore do
    def put(store, page) do
      hash = GenServer.call(store.pid, {:put, page})
      { hash, store }
    end

    def get(store, hash) do
      try do
        GenServer.call(store.pid, {:get, hash, store.prefer_ask})
      catch
        :exit, {:timeout, _} -> nil
      end
    end

    def copy(store, other_store, hash) do
      page = SData.PageStore.get(other_store, hash)
      refs = SData.Page.refs(page)
      for ref <- refs do
        copy(store, other_store, ref)
      end
      GenServer.call(store.pid, {:put, page})
      store
    end

    def free(store, _hash) do
      store ## DO SOMETHING???
    end
  end
end