aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/manager.ex
blob: 3a0e21cde3f591c67dab6c5d34b339441ae31eb6 (plain) (tree)
1
2
3
4
5
6
7
                             





                                                                                       





                                                                                       


         
                             
     
                      

   
                          
               
                                        
 
                                      

             
                               
 




                                                                                

                                                           

                                   
 

                           

     

               

                
                                                                                                
                                                                                              
 

                                                           

     
                
                                  
 
                                            
                                           
 
                                                            
 
              

     



                                                                        
       








                                                                                    
       



                                                                



                                                 
                     

     

                                                                   
                     

     
                                                                   
                                                       

                     
 

                                                              
     
 







                                                  

     
 


                          
 



                                                                   












                                                                              

     
                                                                        
                                                                   

     
                                                                   
                                             
           
                                                                          


                                                                          
               

                                                                            
           
                                                                        

       
 
 


                    

         













                                                                        
 
         
                                                                                   
     
                                  
                                                                         
     




                                          

                                             







                                    



                                                            
     

 


                    

         


                                          

                                                      




              




                                                      



                                                                  



         




                                                   
   
defprotocol Shard.Manifest do
  @moduledoc"""
  A shard manifest is a data structure that uniquely defines the identity of the shard.

  The hash of the manifest is the unique identifier of that shard on the network.
  
  The Manifest protocol is a protocol implemented by the manifest structs for the
  different shard types. It contains an operation module() that returns the main module
  for the shard processes. The module must contain a function with the signature:

    {:ok, pid} = <module>.start_link(manifest)

  that will be called when the shard must be started.
  """

  @doc"""
  Get the module in question.
  """
  def module(manifest)
end

defmodule Shard.Manager do
  @moduledoc"""
    Maintains several important tables :

    - :shard_db (persistent with DETS)
    
      List of
        { id, manifest, state }

    - :peer_db (persistent with DETS)

      Mult-list of
        { shard_id, peer_info }   # TODO: add health info (last seen, ping, etc)

      peer_info := {:inet, ip, port}
      TODO peer_info |= {:inet6, ip, port} | {:onion, name}

    - :shard_procs (not persistent)

      List of
        { {id, path}, pid }
  """

  use GenServer

  require Logger

  @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
  @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom

  def start_link(_) do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def init(_) do
    Process.flag(:trap_exit, true)

    :dets.open_file(@shard_db, [type: :set])
    :dets.open_file(@peer_db, [type: :bag])

    :ets.new(:shard_procs, [:set, :protected, :named_table])

    {:ok, %{}}
  end

  def handle_call({:find_or_start, shard_id, manifest}, _from, state) do
    case :dets.lookup(@shard_db, shard_id) do
      [] -> :dets.insert(@shard_db, {shard_id, manifest, nil})
      _ -> nil
    end

    case :ets.lookup(:shard_procs, {shard_id, nil}) do
      [] ->
        {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest])
        :ets.insert(:shard_procs, {{shard_id, nil}, pid})
        state = Map.put(state, pid, {shard_id, nil})
        {:reply, pid, state}
      pid ->
        {:reply, pid, state}
    end
  end

  def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
    :ets.insert(:shard_procs, { {shard_id, path}, pid })
    state = Map.put(state, pid, {shard_id, path})
    if path != nil do
      Process.monitor(pid)
    end
    {:noreply, state}
  end

  def handle_cast({:peer_db_insert, shard_id, peer_info}, state) do
    :dets.insert(@peer_db, {shard_id, peer_info})
    {:noreply, state}
  end

  def handle_cast({:peer_db_delete, shard_id, peer_info}, state) do
    :dets.match_delete(@peer_db, {shard_id, peer_info})
    {:noreply, state}
  end

  def handle_info({:DOWN, _, :process, pid, reason}, state) do
    handle_info({:EXIT, pid, reason}, state)
  end

  def handle_info({:EXIT, pid, _reason}, state) do
    case state[pid] do
      nil -> {:noreply, state}
      info ->
        :ets.delete(:shard_procs, info)
        state = Map.delete(state, pid)
        {:noreply, state}
    end
  end


  # ======================
  # CALLED BY SNet.TcpConn
  # ======================

  @doc"""
  Dispatch incoming message to correct shard process
  """
  def incoming(conn_pid, peer_info, auth, {:interested, shards}) do
    for shard_id <- shards do
      case :dets.lookup(@shard_db, shard_id) do
        [{ ^shard_id, manifest, _ }] ->
          GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
          pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do
            [] ->
              GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
            [{{^shard_id, nil}, pid}] -> pid
          end
          GenServer.cast(pid, {:interested, conn_pid, auth})
        [] -> nil
      end
    end
  end

  def incoming(_conn_pid, peer_info, _auth, {:not_interested, shard}) do
    GenServer.cast(__MODULE__, {:peer_db_delete, shard, peer_info})
  end

  def incoming(conn_pid, peer_info, auth, {shard_id, path, msg}) do
    case :dets.lookup(@shard_db, shard_id) do
      [] ->
        GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}})
      [{ ^shard_id, manifest, _}] ->
        GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info})
        pid = case :ets.lookup(:shard_procs, {shard_id, path}) do
          [] ->
            GenServer.call(__MODULE__, {:find_or_start, shard_id, manifest})
          [{ {^shard_id, ^path}, pid }] -> pid
        end
        GenServer.cast(pid, {:msg, conn_pid, auth, shard_id, path, msg})
    end
  end


  # ================
  # CALLED BY Sapp.*
  # ================

  @doc"""
  Register a process as the main process for a shard.

  Returns either :ok or :redundant, in which case the process must exit.
  """
  def register(shard_id, manifest, pid) do
    GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
  end

  @doc"""
  Register a process as the handler for shard packets for a given path.
  """
  def dispatch_to(shard_id, path, pid) do
    GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
  end

  @doc"""
  Return the list of all peer info for peers that are interested in a certain shard
  """
  def get_shard_peers(shard_id) do
    for {_, peer_info} <- :dets.lookup(@peer_db, shard_id), do: peer_info
  end

  @doc"""
  Return the saved state value for a shard
  """
  def load_state(shard_id) do
    case :dets.lookup(@shard_db, shard_id) do
      [{^shard_id, _, state}] -> state
      _ -> nil
    end
  end

  @doc"""
  Save a state value for a shard
  """
  def save_state(shard_id, state) do
    case :dets.lookup(@shard_db, shard_id) do
      [{^shard_id, manifest, _old_state}] -> 
        :dets.insert(@shard_db, {shard_id, manifest, state})
    end
  end


  # ================
  # CALLED BY ANYONE
  # ================

  @doc"""
  Returns the pid for a shard if it exists
  """
  def find_proc(shard_id) do
    case :ets.lookup(:shard_procs, {shard_id, nil}) do
      [{{^shard_id, _}, pid}] -> pid
      _ -> nil
    end
  end

  @doc"""
  Returns the pid for a shard defined by its manifest.
  Start it if it doesn't exist.
  """
  def find_or_start(manifest) do
    id = SData.term_hash manifest
    case :ets.lookup(:shard_procs, {id, nil}) do
      [{{^id, nil}, pid}] -> pid
      [] ->
        GenServer.call(__MODULE__, {:find_or_start, id, manifest})
    end
  end

  @doc"""
  Return the list of all shards.
  """
  def list_shards() do
    for [x] <- :dets.match(@shard_db, :"$1"), do: x
  end
end