aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/manager.ex
blob: 82984d693ba2ac7b6923ca783ceab6f4a36c8d6a (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
                          
               
                                        








                                                                            
                                   
 
                  
 



                           

                  



                             
 




                            

     

               

                

                                                               

     


                                                         

                                                              

                                                
                                               

     
                                                                        



                                                        

                           
                                                       
         
        
                
       





                                                                
                     

     



                                                           
                                                          




                                                     

     



                                                                 
 



                                                                       

                                                          
                                                               





                                                    
                                              





                                                                  

                                                       
       
                                 
 





                                                       

     





                                                              


                                                        
                                                        

                                                                                                       
       


                     






                                                    


                     





                                                  




                                                         

                                   






                                                                                                                                        


       








                                                    



                                                     


                                             








                                                                     



                                                    
                                            













                                                                                                                                   

       























                                                                        
   
defmodule Shard.Manager do
  @moduledoc"""
    Maintains several important tables :

    - :peer_db
      
      List of
        { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }

    - :shard_db
    
      List of
        { id, manifest, pid | nil }

    - :shard_procs

      List of
        { {id, path}, pid }

    - :shard_peer_db

      Mult-list of
        { shard_id, peer_id }


    And an internal table :

    - :outbox
      
      Multi-list of
        { peer_id, message }

  """

  use GenServer

  require Logger

  def start_link(my_port) do
    GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
  end

  def init(my_port) do
    :ets.new(:peer_db, [:set, :protected, :named_table])
    :ets.new(:shard_db, [:set, :protected, :named_table])
    :ets.new(:shard_procs, [:set, :protected, :named_table])
    :ets.new(:shard_peer_db, [:bag, :protected, :named_table])
    outbox = :ets.new(:outbox, [:bag, :private])

    {:ok, %{my_port: my_port, outbox: outbox} }
  end

  def handle_call({:register, shard_id, manifest, pid}, _from, state) do
    will_live = case :ets.lookup(:shard_db, shard_id) do
      [{ ^shard_id, _, pid }] -> not Process.alive?(pid)
      _ -> true
    end
    reply = if will_live do
      Process.monitor(pid)
      :ets.insert(:shard_db, {shard_id, manifest, pid})
      :ok
    else
      :redundant
    end
    {:reply, reply, state}
  end

  def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
    :ets.insert(:shard_procs, { {shard_id, path}, pid })
    Process.monitor(pid)
    {:noreply, state}
  end

  def handle_cast({:interested, peer_id, shards}, state) do
    for shard_id <- shards do
      case :ets.lookup(:shard_db, shard_id) do
        [{ ^shard_id, _, pid }] ->
          :ets.insert(:shard_peer_db, {shard_id, peer_id})
          GenServer.cast(pid, {:interested, peer_id})
        [] -> nil
      end
    end
    {:noreply, state}
  end

  def handle_cast({:not_interested, peer_id, shard_id}, state) do
    :ets.match_delete(:shard_peer_db, {shard_id, peer_id})
    {:noreply, state}
  end

  def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do
    :ets.insert(:shard_peer_db, {shard_id, peer_id})
    {:noreply, state}
  end

  def handle_cast({:peer_up, pk, pid, ip, port}, state) do
    for [pk2] <- :ets.match(:peer_db, {:'$1', :_, ip, port}) do
      if pk2 != pk do
        # obsolete peer information
        :ets.delete(:peer_db, pk2)
        :ets.match_delete(:shard_peer_db, {:_, pk2})
      end
    end
    :ets.insert(:peer_db, {pk, pid, ip, port})

    # Send interested message for all our shards
    id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id)
    GenServer.cast(pid, {:send_msg, {:interested, id_list}})

    # Send queued messages
    for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
      GenServer.cast(pid, {:send_msg, msg})
    end
    :ets.delete(state.outbox, pk)

    {:noreply, state}
  end

  def handle_cast({:peer_down, pk, ip, port}, state) do
    :ets.insert(:peer_db, {pk, nil, ip, port})
    {:noreply, state}
  end

  def handle_cast({:connect_and_send, peer_id, msg}, state) do
    case :ets.lookup(:peer_db, peer_id) do
      [{^peer_id, nil, ip, port}] ->
        add_peer(ip, port, state)
        currtime = System.os_time :second
        :ets.insert(state.outbox, {peer_id, msg, currtime})
        outbox_cleanup = [{{:_, :_, :'$1'},
                           [{:<, :'$1', currtime - 60}],
                           [:'$1']}]
        :ets.select_delete(state.outbox, outbox_cleanup)
      _ ->
        Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database"
    end
    {:noreply, state}
  end

  def handle_cast({:try_connect, pk_list}, state) do
    for pk <- pk_list do
      case :ets.lookup(:peer_db, pk) do
        [{^pk, nil, ip, port}] ->
          add_peer(ip, port, state)
        _ -> nil
      end
    end
    {:noreply, state}
  end

  def handle_cast({:add_peer, ip, port}, state) do
    add_peer(ip, port, state)
    {:noreply, state}
  end

  def handle_info({:DOWN, _, :process, pid, _}, state) do
    :ets.match_delete(:shard_procs, {:_, pid})
    {:noreply, state}
  end

  defp add_peer(ip, port, state) do
    spawn fn ->
      case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
        {:ok, client} ->
          {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
          :ok = :gen_tcp.controlling_process(client, pid)
        _ ->
          Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
        end
    end
  end


  # ================
  # PUBLIC INTERFACE
  # ================
  

  @doc"""
  Connect to a peer specified by ip address and port
  """
  def add_peer(ip, port) do
    GenServer.cast(__MODULE__, {:add_peer, ip, port})
  end

  @doc"""
  Send message to a peer specified by peer id
  """
  def send(peer_id, msg) do
    case :ets.lookup(:peer_db, peer_id) do
      [{ ^peer_id, pid, _, _}] when pid != nil->
        GenServer.cast(pid, {:send_msg, msg})
      _ -> 
        GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
    end
  end

  @doc"""
  Dispatch incoming message to correct shard process
  """
  def dispatch(peer_id, {shard_id, path, msg}) do
    case :ets.lookup(:shard_db, shard_id) do
      [] ->
        __MODULE__.send(peer_id, {:not_interested, shard_id})
      [_] ->
        case :ets.match(:shard_peer_db, {shard_id, peer_id}) do
          [] ->
            GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id})
          _ -> nil
        end
        case :ets.lookup(:shard_procs, {shard_id, path}) do
          [{ {^shard_id, ^path}, pid }] ->
            GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg})
          [] ->
            Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}")
        end
    end
  end

  def dispatch(peer_id, {:interested, shards}) do
    GenServer.cast(__MODULE__, {:interested, peer_id, shards})
  end

  def dispatch(peer_id, {:not_interested, shard}) do
    GenServer.cast(__MODULE__, {:not_interested, peer_id, shard})
  end

  @doc"""
  Register a process as the main process for a shard.

  Returns either :ok or :redundant, in which case the process must exit.
  """
  def register(shard_id, manifest, pid) do
    GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
  end

  @doc"""
  Register a process as the handler for shard packets for a given path.
  """
  def dispatch_to(shard_id, path, pid) do
    GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
  end
end