aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/manager.ex
blob: 990bcead8be880752a1933e15c972f889650b573 (plain) (tree)
1
2
3
4
5
6
7
                             





                                                                                       





                                                                                       


         
                             
     
                      

   
                          
               
                                        
 
                                      

             




                                                                                        
 




                                                                                

                                                           

                                   
 

                           

     

               

                


                                      
                                                                                                
                                                                                              
 

                                                           

     
                
                                  
 
                                            
                                           
 
                                                            
 
                                                  
              

     
                                                                        

                                                           



                                                                



                                                 
                     

     

                                                                   
                     

     
                                                                   
                                                       

                     
 













































































                                                                                                

                                                              
     
 







                                                  

     




















































                                                                                      
 


                          
 



                                                                   

                                               
                                          









                                                                              

     
                                                                        
                                                                   

     
                                                                   
                                             
           
                                                                          
                                       

                                                                          
               

                                                                            
           
                                                                        

       
 
 


                    

         













                                                                        
 
         
                                                                                   
     
                                  
                                                                         
     




                                          
                                             
                                         







                                    
                                                              
     

 


                    

         


                                          

                                                      




              




                                                      



                                                                  



         


                                                          

                      
                                                                            
     
   
defprotocol Shard.Manifest do
  @moduledoc"""
  A shard manifest is a data structure that uniquely defines the identity of the shard.

  The hash of the manifest is the unique identifier of that shard on the network.
  
  The Manifest protocol is a protocol implemented by the manifest structs for the
  different shard types. It contains an operation module() that returns the main module
  for the shard processes. The module must contain a function with the signature:

    {:ok, pid} = <module>.start_link(manifest)

  that will be called when the shard must be started.
  """

  @doc"""
  Get the module in question.
  """
  def module(manifest)
end

defmodule Shard.Manager do
  @moduledoc"""
    Maintains several important tables :

    - :shard_db (persistent with DETS)
    
      List of
        { id, manifest, why_have_it, state }

      why_have_it := {:pinned, %MapSet{who requires it...}, %MapSet{who it requires...}}
                   | {:req, %MapSet{who requires it...}, %MapSet{who it requires...}}
                   | {:cached, expiry_date}

    - :peer_db (persistent with DETS)

      Mult-list of
        { shard_id, peer_info }   # TODO: add health info (last seen, ping, etc)

      peer_info := {:inet, ip, port}
      TODO peer_info |= {:inet6, ip, port} | {:onion, name}

    - :shard_procs (not persistent)

      List of
        { {id, path}, pid }
  """

  use GenServer

  require Logger

  @cache_ttl 3600*24      # 24 hours
  @clean_cache_every 60   # one minute

  @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
  @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom

  def start_link(_) do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def init(_) do
    Process.flag(:trap_exit, true)

    :dets.open_file(@shard_db, [type: :set])
    :dets.open_file(@peer_db, [type: :bag])

    :ets.new(:shard_procs, [:set, :protected, :named_table])

    Process.send_after(self(), :clean_cache, 1000)
    {:ok, %{}}
  end

  def handle_call({:find_or_start, shard_id, manifest}, _from, state) do
    {pid, state} = find_or_start(state, shard_id, manifest)
    {:reply, pid, state}
  end

  def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
    :ets.insert(:shard_procs, { {shard_id, path}, pid })
    state = Map.put(state, pid, {shard_id, path})
    if path != nil do
      Process.monitor(pid)
    end
    {:noreply, state}
  end

  def handle_cast({:peer_db_insert, shard_id, peer_info}, state) do
    :dets.insert(@peer_db, {shard_id, peer_info})
    {:noreply, state}
  end

  def handle_cast({:peer_db_delete, shard_id, peer_info}, state) do
    :dets.match_delete(@peer_db, {shard_id, peer_info})
    {:noreply, state}
  end

  def handle_cast({:save_state, shard_id, shst}, state) do
    case :dets.lookup(@shard_db, shard_id) do
      [{^shard_id, manifest, why_have_it, _old_state}] -> 
        :dets.insert(@shard_db, {shard_id, manifest, why_have_it, shst})
    end
    {:noreply, state}
  end

  def handle_cast({:pin, shard_id}, state) do
    case :dets.lookup(@shard_db, shard_id) do
      [{^shard_id, manifest, {:cached, _}, shst}] ->
        :dets.insert(@shard_db, {shard_id, manifest, {:pinned, %MapSet{}, %MapSet{}}, shst})
        {pid, state} = find_or_start(state, shard_id, manifest)
        GenServer.cast(pid, :send_deps)
        {:noreply, state}
      [{^shard_id, manifest, {:req, a, b}, shst}] ->
        :dets.insert(@shard_db, {shard_id, manifest, {:pinned, a, b}, shst})
        {:noreply, state}
      _ ->
        {:noreply, state}
    end
  end

  def handle_cast({:unpin, shard_id}, state) do
    case :dets.lookup(@shard_db, shard_id) do
      [{^shard_id, manifest, {:pinned, a, b}, shst}] ->
        if MapSet.size(a) > 0 do
          :dets.insert(@shard_db, {shard_id, manifest, {:req, a, b}, shst})
        else
          for dep <- b do
            rm_dep_link(shard_id, dep)
          end
          :dets.insert(@shard_db, {shard_id, manifest, cached(), shst})
        end
      _ -> nil
    end
    {:noreply, state}
  end

  def handle_cast({:dep_list, shard_id, manifests}, state) do
    case :dets.lookup(@shard_db, shard_id) do
      [{^shard_id, manifest, {reason, a, b}, shst}] when reason == :pinned or reason == :req ->
        bnew_pairs = Enum.map(manifests, fn m -> {SData.term_hash(m), m} end)
        bnew_map = Enum.reduce(bnew_pairs, %{}, fn {id, m}, map -> Map.put(map, id, m) end)
        bnew_set = Enum.reduce(bnew_pairs, %MapSet{}, fn {id, _m}, ms -> MapSet.put(ms, id) end)
        state = MapSet.difference(bnew_set, b)
                |> Enum.reduce(state, fn idadd, state ->
                    add_dep_link(state, shard_id, idadd, bnew_map[idadd])
                  end)
        for idrm <- MapSet.difference(b, bnew_set) do
          rm_dep_link(shard_id, idrm)
        end
        :dets.insert(@shard_db, {shard_id, manifest, {reason, a, bnew_set}, shst})
        {:noreply, state}
      _ ->
        {:noreply, state}
    end
  end

  def handle_info(:clean_cache, state) do
    currtime = System.os_time :seconds

    shards = :dets.select(@shard_db, [{
      {:'$1', :_, {:cached, :'$2'}, :_}, [{:<, :'$2', currtime}], [:'$1']}
    ])
    for [id] <- shards do
      case :ets.lookup(:shard_procs, {id, nil}) do
        [{{^id, nil}, pid}] ->
          GenServer.cast(pid, :delete_shard)
        _ -> nil
      end
      :dets.delete(@shard_db, id)
    end

    Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
    {:noreply, state}
  end

  def handle_info({:DOWN, _, :process, pid, reason}, state) do
    handle_info({:EXIT, pid, reason}, state)
  end

  def handle_info({:EXIT, pid, _reason}, state) do
    case state[pid] do
      nil -> {:noreply, state}
      info ->
        :ets.delete(:shard_procs, info)
        state = Map.delete(state, pid)
        {:noreply, state}
    end
  end

  defp find_or_start(state, shard_id, manifest) do
    case :dets.lookup(@shard_db, shard_id) do
      [] ->
        :dets.insert(@shard_db, {shard_id, manifest, cached(), nil})
      [{^shard_id, ^manifest, {:cached, _}, shst}] ->
        :dets.insert(@shard_db, {shard_id, manifest, cached(), shst})
      _ -> nil
    end

    case :ets.lookup(:shard_procs, {shard_id, nil}) do
      [] ->
        {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest])
        :ets.insert(:shard_procs, {{shard_id, nil}, pid})
        state = Map.put(state, pid, {shard_id, nil})
        {pid, state}
      [{{^shard_id, nil}, pid}] ->
        {pid, state}
    end
  end

  defp cached() do
    {:cached, System.os_time(:seconds) + @cache_ttl}
  end

  defp add_dep_link(state, shard_id, id2, m2) do
    case :dets.lookup(@shard_db, id2) do
      [{^id2, ^m2, {reason, a, b}, shst}] when reason == :pinned or reason == :req ->
        :dets.insert(@shard_db, {id2, m2, {reason, MapSet.put(a, shard_id), b}, shst})
        state
      _ ->
        a = MapSet.new() |> MapSet.put(shard_id)
        :dets.insert(@shard_db, {id2, m2, {:req, a, %MapSet{}}, nil})
        {pid, state} = find_or_start(state, id2, m2)
        GenServer.cast(pid, :send_deps)
        state
    end
  end

  defp rm_dep_link(shard_id, id2) do
    case :dets.lookup(@shard_db, id2) do
      [{^id2, m2, {reason, a, b}, shst}] when reason == :pinned or reason == :req ->
        a2 = MapSet.delete(a, shard_id)
        if reason == :req and MapSet.size(a2) == 0 do
          :dets.insert(@shard_db, {id2, m2, cached(), shst})
          for dep <- b do
            rm_dep_link(id2, dep)
          end
        else
          :dets.insert(@shard_db, {id2, m2, {reason, a2, b}, shst})
        end
    end
  end


  # ======================
  # CALLED BY SNet.TcpConn
  # ======================

  @doc"""
  Dispatch incoming message to correct shard process
  """
  def incoming(conn_pid, peer_info, auth, {:interested, shards}) do
    for shard_id <- shards do
      case :dets.lookup(@shard_db, shard_id) do
        [{ ^shard_id, manifest, _, _ }] ->
          GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
          pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do
            [] ->
              GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
            [{{^shard_id, nil}, pid}] -> pid
          end
          GenServer.cast(pid, {:interested, conn_pid, auth})
        [] -> nil
      end
    end
  end

  def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do
    GenServer.cast(__MODULE__, {:peer_db_delete, shard, peer_info})
  end

  def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do
    case :dets.lookup(@shard_db, shard_id) do
      [] ->
        GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}})
      [{ ^shard_id, manifest, _, _}] ->
        GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
        pid = case :ets.lookup(:shard_procs, {shard_id, path}) do
          [] ->
            GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
          [{ {^shard_id, ^path}, pid }] -> pid
        end
        GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg})
    end
  end


  # ================
  # CALLED BY Sapp.*
  # ================

  @doc"""
  Register a process as the main process for a shard.

  Returns either :ok or :redundant, in which case the process must exit.
  """
  def register(shard_id, manifest, pid) do
    GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
  end

  @doc"""
  Register a process as the handler for shard packets for a given path.
  """
  def dispatch_to(shard_id, path, pid) do
    GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
  end

  @doc"""
  Return the list of all peer info for peers that are interested in a certain shard
  """
  def get_shard_peers(shard_id) do
    for {_, peer_info} <- :dets.lookup(@peer_db, shard_id), do: peer_info
  end

  @doc"""
  Return the saved state value for a shard
  """
  def load_state(shard_id) do
    case :dets.lookup(@shard_db, shard_id) do
      [{^shard_id, _, _, state}] -> state
      _ -> nil
    end
  end

  @doc"""
  Save a state value for a shard
  """
  def save_state(shard_id, state) do
    GenServer.cast(__MODULE__, {:save_state, shard_id, state})
  end


  # ================
  # CALLED BY ANYONE
  # ================

  @doc"""
  Returns the pid for a shard if it exists
  """
  def find_proc(shard_id) do
    case :ets.lookup(:shard_procs, {shard_id, nil}) do
      [{{^shard_id, _}, pid}] -> pid
      _ -> nil
    end
  end

  @doc"""
  Returns the pid for a shard defined by its manifest.
  Start it if it doesn't exist.
  """
  def find_or_start(manifest) do
    id = SData.term_hash manifest
    case :ets.lookup(:shard_procs, {id, nil}) do
      [{{^id, nil}, pid}] -> pid
      [] ->
        GenServer.call(__MODULE__, {:find_or_start, id, manifest})
    end
  end

  @doc"""
  Return the list of all shards. Returns a list of tuples:

      {id, manifest, why_have_it}
  """
  def list_shards() do
    for [{id, m, why, _}] <- :dets.match(@shard_db, :"$1"), do: {id, m, why}
  end
end