aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/app/pagestore.ex
blob: 86b07264a2bf9103fa3f1b4b956f9730fe698126 (plain) (tree)
1
2
3
4
5
6
7
8
9
                           
                
                                                               



                                                                                   

                       

                                                             

                      
                                              
                                       






                               

                                       
                                                                                   

                    
                                                                    


     

                                                                

     
                                         

                                                     

                                                                                                                               
 
                                                  
 
                                                                                                            

     
 
                                                          
                                         

                            







                                                              


       


                                               


                         



                                                     









                                       
                                     
                                          
           
                                                                                               

                   
                                                  





                    









                                                                              

                                                    
              
                                 
                                                                                
           

                                    
       
                                        


                                                      
                                              





                                                            










                                                                                              
             













                                                                   
               











                                                                                   
                 
               
                 
              






                                               
               











                                                                        

                 

                       
       
     



                                                                 
                                            
                                                                                         





                                                                                        
                                                         
               
                                                    
         



                            
                                            
                            
                                                      
                                                        
                       
                                                 






                                              
                                        
                                      
                                           
                           
                                                               

                                                       
                                                          

                                                       






                                         

                                                                                     
                                                   



                                                                       

                                     
                                                                                      

     

                            

                                                   



                           
            



                                                                         


                                   


                                         
                                                                          







                              
defmodule SApp.PageStore do
  @moduledoc """
  A module that implements a content-adressable storage (pages,
  identified by the hash of their contents).

  This is not a shard, it is a side process that a shard may use to store its data.

  Uses an ETS table of:
  
    { page_id, why_have_it }         -- waiting for data
    { page_id, why_have_it, data }   -- once we have the data

  why_have_it := :root
               | {:req_by, some_other_page_id}
               | {:cached, expiry_date}
  """

  use GenServer

  @enforce_keys [:pid]
  defstruct [:pid, :prefer_ask]

  @cache_ttl 600          # Ten minutes
  @clean_cache_every 60   # One minute
  @max_failures 4         # Maximum of peers that reply not_found before we abandon

  defmodule State do
    defstruct [:shard_id, :path, :netgroup, :store, :reqs, :retries]
  end


  def start_link(shard_id, path, netgroup) do
    GenServer.start_link(__MODULE__, [shard_id, path, netgroup])
  end

  def init([shard_id, path, netgroup]) do
    Shard.Manager.dispatch_to(shard_id, path, self())

    store_path = [Application.get_env(:shard, :data_path), "#{shard_id|>Base.encode16}.#{path}"] |> Path.join |> String.to_atom
    {:ok, store} = :dets.open_file store_path, [type: :set]

    Process.send_after(self(), :clean_cache, 1000)

    {:ok, %State{shard_id: shard_id, path: path, netgroup: netgroup, store: store, reqs: %{}, retries: %{}}}
  end


  def handle_call({:get, key, prefer_ask}, from, state) do
    case :dets.lookup state.store, key do
      [{_, _, bin}] ->
        {:reply, bin, state}
      [{_, _}] ->
        state = add_request(state, key, from)
        {:noreply, state}
      [] ->
        why = {:cached, System.os_time(:seconds) + @cache_ttl}
        init_rec_pull(state, key, why, prefer_ask)
        state = add_request(state, key, from)
        {:noreply, state}
    end
  end

  def handle_call({:put, bin}, _from, state) do
    hash = SData.bin_hash bin
    store_put(state, hash, bin)
    {:reply, hash, state}
  end

  def handle_call({:have_rec, root}, _from, state) do
    {:reply, have_rec(state, root), state}
  end

  defp add_request(state, key, from) do
    reqs_key = case state.reqs[key] do
      nil ->
        MapSet.put(MapSet.new(), from)
      ms ->
        MapSet.put(ms, from)
    end
    put_in(state.reqs[key], reqs_key)
  end

  defp store_put(state, hash, bin) do
    case :dets.lookup state.store, hash do
      [] ->
        :dets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin}
        nil
      [{_, why}] ->
        :dets.insert state.store, {hash, why, bin}
        why
      [{_, _, _}] ->
        nil
    end
  end

  defp have_rec(state, root) do
    case :dets.lookup state.store, root do
      [{_, _, bin}] ->
        pg = SData.term_unbin bin
        pg |> SData.Page.refs |> Enum.map(&(have_rec(state, &1))) |> Enum.all?
      _ ->
        false
    end
  end

  defp init_rec_pull(state, key, why, prefer_ask) do
    case prefer_ask do
      [_|_] ->
        for peer <- prefer_ask do
          SNet.Manager.send_pid(peer, {state.shard_id, state.path, {:get, key}})
        end
      _ ->
        ask_random_peers(state, key)
    end
    :dets.insert state.store, {key, why}
  end

  def handle_cast({:rec_pull, hash, ask_to}, state) do
    if :dets.lookup state.store, hash == [] do
      why = {:cached, System.os_time(:seconds) + @cache_ttl}
      init_rec_pull(state, hash, why, ask_to)
    end
    {:noreply, state}
  end

  def handle_cast({:msg, conn_pid, auth, _shard_id, _path, msg}, state) do
    if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
      state
    else
      state = case msg do
        {:get, key} ->
          case :dets.lookup state.store, key do
            [{_, _, bin}] ->
              SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}})
            _ ->
              SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}})
          end
          state
        {:info, hash, bin} ->
          already_have_it = case :dets.lookup state.store, hash do
            [{_, _, _}] -> true
            _ -> false
          end
          if SData.bin_hash(bin) == hash and not already_have_it do
            reqs = case state.reqs[hash] do
              nil -> state.reqs
              pids ->
                for pid <- pids do
                  GenServer.reply(pid, bin)
                end
                Map.delete(state.reqs, hash)
            end
            state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)}
            rec_why = store_put(state, hash, bin)
            if rec_why != nil do
              sub_why = case rec_why do
                {:cached, ttl} -> {:cached, ttl}
                _ -> {:req_by, hash}
              end
              value = SData.term_unbin bin
              for dep <- SData.Page.refs value do
                if :dets.lookup state.store, dep == [] do
                  init_rec_pull(state, dep, sub_why, [conn_pid])
                end
              end
            end
            state
          else
            state
          end
        {:not_found, key} ->
          if state.reqs[key] != nil do
            nretry = case state.retries[key] do
              nil -> 1
              n -> n+1
            end
            if nretry < @max_failures do
              ask_random_peers(state, key)
              %{state | retries: Map.put(state.retries, key, nretry)}
            else
              for pid <- state.reqs[key] do
                GenServer.reply(pid, nil)
              end
              state = %{state | reqs: Map.delete(state.reqs, key)}
              state = %{state | retries: Map.delete(state.retries, key)}
              state
            end
          else
            state
          end
      end
      {:noreply, state}
    end
  end
  
  def handle_cast({:set_roots, roots}, state) do
    cached_why = {:cached, System.os_time(:seconds) + @cache_ttl}

    # Set old roots and their deps as cached
    for ent <- :dets.select state.store, [{ {:"$1", :root, :"$2"},         [], [:"$$"] },
                                         { {:"$1", :root},                [], [:"$$"] },
                                         { {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] },
                                         { {:"$1", {:req_by, :_}},        [], [:"$$"] }]
    do
      case ent do
        [id, bin] ->
          :dets.insert state.store, {id, cached_why, bin}
        [id] ->
          :dets.insert state.store, {id, cached_why}
      end
    end

    # Set new roots as roots
    for root <- roots do
      case :dets.lookup state.store, root do
        [{^root, _, bin}] ->
          :dets.insert state.store, {root, :root, bin}
          rec_set_dep(state, root, SData.term_unbin bin)
        [{^root, _}] ->
          :dets.insert state.store, {root, :root}
        [] ->
          init_rec_pull state, root, :root, []
      end
    end
    {:noreply, state}
  end

  defp rec_set_dep(state, hash, val0) do
    for dep <- SData.Page.refs val0 do
      case :dets.lookup state.store, dep do
        [{^dep, _, bin}] ->
          :dets.insert state.store, {dep, {:req_by, hash}, bin}
          rec_set_dep(state, dep, SData.term_unbin bin)
        [{^dep, _}] ->
          :dets.insert state.store, {dep, {:req_by, hash}}
        [] ->
          init_rec_pull state, dep, {:req_by, hash}, []
      end
    end
  end

  def handle_info(:clean_cache, state) do
    currtime = System.os_time :seconds

    cache_cleanup = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [true]},
                      {{:_, {:cached, :'$1'}},     [{:<, :'$1', currtime}], [true]} ]
    :dets.select_delete(state.store, cache_cleanup)

    Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
    {:noreply, state}
  end

  def ask_random_peers(state, key) do
    SNet.Group.broadcast(state.netgroup, {state.shard_id, state.path, {:get, key}}, 3)
  end

  defimpl SData.PageStore do
    def put(store, page) do
      bin = SData.term_bin page
      hash = GenServer.call(store.pid, {:put, bin})
      { hash, store }
    end

    def get(store, hash) do
      try do
        case GenServer.call(store.pid, {:get, hash, store.prefer_ask}) do
          nil -> nil
          bin -> SData.term_unbin bin
        end
      catch
        :exit, {:timeout, _} -> nil
      end
    end

    def copy(store, other_store, hash) do
      GenServer.cast(store.pid, {:rec_pull, hash, other_store.prefer_ask})
      store
    end

    def free(store, _hash) do
      store ## DO SOMETHING???
    end
  end
end