aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/app/blockstore.ex
blob: f1140b2e6914bad30244faa6b0e3b4ab71abc501 (plain) (tree)
1
2
3
4
5
6
7






                                                                                   







                                                              






                               

                                       












                                                         





                                                                                        


                                                          

                                        
                          







                                                              




                                               
                               


                         










































                                                                                              


                                                                   

                                            
                                                                                      

                                                                                        












                                                                     










                                                           



                          
                                                                           




















                                                                      

























































                                                                                        

                                     
                                                         

                     
                        



                                                                         






                                                    




                                                                 


                                         
                                                                          







                              
defmodule SApp.BlockStore do
  @moduledoc """
  A module that implements a content-adressable storage (blocks, or pages,
  identified by the hash of their contents).

  This is not a shard, it is a side process that a shard may use to store its data.

  Uses an ETS table of:
  
    { block_id, why_have_it }         -- waiting for data
    { block_id, why_have_it, data }   -- once we have the data

  why_have_it := :root
               | {:req_by, some_other_block_id}
               | {:cached, expiry_date}
  """

  use GenServer

  @enforce_keys [:pid]
  defstruct [:pid, :prefer_ask]

  @cache_ttl 600          # Ten minutes
  @clean_cache_every 60   # One minute

  defmodule State do
    defstruct [:shard_id, :path, :store, :reqs, :retries]
  end


  def start_link(shard_id, path) do
    GenServer.start_link(__MODULE__, [shard_id, path])
  end

  def init([shard_id, path]) do
    Shard.Manager.dispatch_to(shard_id, path, self())

    store_path = String.to_atom "#{Base.encode16 shard_id}/#{Atom.to_string path}"
    store = :ets.new store_path, [:set, :protected]

    Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)

    {:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}}
  end

  def handle_call({:get, key, prefer_ask}, from, state) do
    case :ets.lookup state.store, key do
      [{_, _, v}] ->
        {:reply, v, state}
      [{_, _}] ->
        state = add_request(state, key, from)
        {:noreply, state}
      [] ->
        why = {:cached, System.os_time(:seconds) + @cache_ttl}
        init_rec_pull(state, key, why, prefer_ask)
        state = add_request(state, key, from)
        {:noreply, state}
    end
  end

  def handle_call({:put, val}, _from, state) do
    hash = SData.term_hash val
    store_put(state, hash, val)
    {:reply, hash, state}
  end

  defp add_request(state, key, from) do
    reqs_key = case state.reqs[key] do
      nil ->
        MapSet.put(MapSet.new(), from)
      ms ->
        MapSet.put(ms, from)
    end
    put_in(state.reqs[key], reqs_key)
  end

  defp store_put(state, hash, val) do
    case :ets.lookup state.store, hash do
      [] ->
        :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, val}
        nil
      [{_, why}] ->
        :ets.insert state.store, {hash, why, val}
        why
      [{_, _, _}] ->
        nil
    end
  end

  defp init_rec_pull(state, key, why, prefer_ask) do
    case prefer_ask do
      [_ | _] ->
        for peer <- prefer_ask do
          Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
        end
      _ ->
        ask_random_peers(state, key)
    end
    :ets.insert state.store, {key, why}
  end

  def handle_cast({:rec_pull, hash, ask_to}, state) do
    if :ets.lookup state.store, hash == [] do
      why = {:cached, System.os_time(:seconds) + @cache_ttl}
      init_rec_pull(state, hash, why, ask_to)
    end
    {:noreply, state}
  end

  def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
    state = case msg do
      {:get, key} ->
        case :ets.lookup state.store, key do
          [{_, _, v}] ->
            Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
          _ ->
            Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
        end
        state
      {:info, hash, value} ->
        if SData.term_hash value == hash do
          reqs = case state.reqs[hash] do
            nil -> state.reqs
            pids ->
              for pid <- pids do
                GenServer.reply(pid, value)
              end
              Map.delete(state.reqs, hash)
          end
          state = %{state | retries: Map.delete(state.retries, hash)}
          rec_why = store_put state, hash, value
          if rec_why != nil do
            sub_why = case rec_why do
              {:cached, ttl} -> {:cached, ttl}
              _ -> {:req_by, hash}
            end
            for dep <- SData.Page.refs value do
              init_rec_pull(state, dep, sub_why, [peer_id])
            end
          end
          %{state | reqs: reqs}
        else
          state
        end
      {:not_found, key} ->
        if state.reqs[key] != nil and :ets.lookup state.store, key == [] do
          nretry = case state.retries[key] do
            nil -> 1
            n -> n+1
          end
          if nretry < 3 do
            ask_random_peers(state, key)
            %{state | retries: Map.put(state.retries, key, nretry)}
          else
            for pid <- state.reqs[key] do
              GenServer.reply(pid, nil)
            end
            state = %{state | reqs: Map.delete(state.reqs, key)}
            state = %{state | retries: Map.delete(state.retries, key)}
            state
          end
        else
          state
        end
    end
    {:noreply, state}
  end
  
  def handle_cast({:set_roots, roots}, state) do
    cached_why = {:cached, System.os_time(:seconds) + @cache_ttl}

    # Set old roots as cached
    for [id, val] <- :ets.match state.store, {:"$1", :root, :"$2"} do
      :ets.insert state.store, {id, cached_why, val}
    end
    for [id] <- :ets.match state.store, {:"$1", :root} do
      :ets.insert state.store, {id, cached_why}
    end

    # Set old deps as cached
    for [id, val] <- :ets.match state.store, {:"$1", {:req_by, :_}, :"$2"} do
      :ets.insert state.store, {id, cached_why, val}
    end
    for [id] <- :ets.match state.store, {:"$1", {:req_by, :_}} do
      :ets.insert state.store, {id, cached_why}
    end

    # Set new roots as roots
    for root <- roots do
      case :ets.lookup state.store, root do
        [{^root, _, val}] ->
          :ets.insert state.store, {root, :root, val}
          rec_set_dep state.store, root, val
        [{^root, _}] ->
          :ets.insert state.store, {root, :root}
        [] ->
          init_rec_pull state, root, :root, []
      end
    end
    {:noreply, state}
  end

  defp rec_set_dep(store, hash, val0) do
    for dep <- SData.Page.refs val0 do
      case :ets.lookup store, dep do
        [{^dep, _, val}] ->
          :ets.insert store, {dep, {:req_by, hash}, val}
          rec_set_dep(store, dep, val)
        _ ->
          :ets.insert store, {dep, {:req_by, hash}}
      end
    end
  end

  def handle_info(:clean_cache, state) do
    currtime = System.os_time :seconds

    cache_cleanup_1 = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [:'$1']} ]
    cache_cleanup_2 = [ {{:_, {:cached, :'$1'}},     [{:<, :'$1', currtime}], [:'$1']} ]
    :ets.select_delete(state.store, cache_cleanup_1)
    :ets.select_delete(state.store, cache_cleanup_2)

    Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
    {:noreply, state}
  end

  def ask_random_peers(state, key) do
    peers = Shard.Manager.get_shard_peers(state.shard_id)
      |> Enum.shuffle
      |> Enum.take(3)
    for peer <- peers do
      Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
    end
  end

  defimpl SData.PageStore do
    def put(store, page) do
      hash = GenServer.call(store.pid, {:put, page})
      { hash, store }
    end

    def get(store, hash) do
      try do
        GenServer.call(store.pid, {:get, hash, store.prefer_ask})
      catch
        :exit, {:timeout, _} -> nil
      end
    end

    def copy(store, other_store, hash) do
      GenServer.cast(store.pid, {:rec_pull, hash, other_store.prefer_ask})
      store
    end

    def free(store, _hash) do
      store ## DO SOMETHING???
    end
  end
end