aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/manager.ex
blob: 8cedce242cf181255c84d31c81af5aed4eccf120 (plain) (tree)
1
2
3
4
5
6
7
8




                                              
                          
               
                                        



              
                                   



               
                                   
 
                  
 



                           

                  



                             
 


                   
                                           
 

     

               

                



                                                                                                          

                                                               

     
                      














                                                                 
                                                            

                                                
                                               

     
                                                                        

                                                                        

               

                           
                                                        
         
        
                
       





                                                                
                     

     

                                                           
                                               
                                  
                                                           




                                                     

     
                                                                 
                                                           

                     
 
                                                                       
                                                     

                     

                                                          
                                                                

                                   

                                                     

         
                                               

                                                
                                                                         


                                                            

                                                       
       
                                 
 



                                                       
                                               
                     

     
                                                              
                                           



                                                           


                                                        
                                                        

                                                                                                       
       


                     

                                                    
                                        



                                   


                     





                                                  




                                                         

                                   






                                                                                                                                        


       








                                                    



                                                     


                                             
                           
                                           






                                                                     



                                                    
                                             


                                                             
                                                                









                                                                                                                                   

       























                                                                        











                                                                    
   
defprotocol Shard.Manifest do
  @doc "Start the corresponding Shard process"
  def start(manifest)
end

defmodule Shard.Manager do
  @moduledoc"""
    Maintains several important tables :

    - :peer_db
      
      List of
        { id, pid | nil, ip, port }

    - :shard_db
    
      List of
        { id, manifest, pid | nil }

    - :shard_procs

      List of
        { {id, path}, pid }

    - :shard_peer_db

      Mult-list of
        { shard_id, peer_id }


    And an internal table :

    - :outbox
      
      Multi-list of
        { peer_id, message, time_inserted }

  """

  use GenServer

  require Logger

  @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
  @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
  @shard_peer_db [Application.get_env(:shard, :data_path), "shard_peer_db"] |> Path.join |> String.to_atom

  def start_link(my_port) do
    GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
  end

  def init(my_port) do
    :dets.open_file(@peer_db, [type: :set])
    for [{id, _pid, ip, port}] <- :dets.match @peer_db, :"$1" do
      :dets.insert @peer_db, {id, nil, ip, port}
      # connect blindly to everyone
      add_peer(ip, port)
    end

    :dets.open_file(@shard_db, [type: :set])
    for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do
      :dets.insert @shard_db, {id, manifest, nil}
      spawn fn -> Shard.Manifest.start manifest end
    end

    :dets.open_file(@shard_peer_db, [type: :bag])

    :ets.new(:shard_procs, [:set, :protected, :named_table])
    outbox = :ets.new(:outbox, [:bag, :private])

    {:ok, %{my_port: my_port, outbox: outbox} }
  end

  def handle_call({:register, shard_id, manifest, pid}, _from, state) do
    will_live = case :dets.lookup(@shard_db, shard_id) do
      [{ ^shard_id, _, pid }] when pid != nil -> not Process.alive?(pid)
      _ -> true
    end
    reply = if will_live do
      Process.monitor(pid)
      :dets.insert(@shard_db, {shard_id, manifest, pid})
      :ok
    else
      :redundant
    end
    {:reply, reply, state}
  end

  def handle_cast({:dispatch_to, shard_id, path, pid}, state) do
    :ets.insert(:shard_procs, { {shard_id, path}, pid })
    Process.monitor(pid)
    {:noreply, state}
  end

  def handle_cast({:interested, peer_id, shards}, state) do
    for shard_id <- shards do
      case :dets.lookup(@shard_db, shard_id) do
        [{ ^shard_id, _, pid }] ->
          :dets.insert(@shard_peer_db, {shard_id, peer_id})
          GenServer.cast(pid, {:interested, peer_id})
        [] -> nil
      end
    end
    {:noreply, state}
  end

  def handle_cast({:not_interested, peer_id, shard_id}, state) do
    :dets.match_delete(@shard_peer_db, {shard_id, peer_id})
    {:noreply, state}
  end

  def handle_cast({:shard_peer_db_insert, shard_id, peer_id}, state) do
    :dets.insert(@shard_peer_db, {shard_id, peer_id})
    {:noreply, state}
  end

  def handle_cast({:peer_up, pk, pid, ip, port}, state) do
    for [pk2] <- :dets.match(@peer_db, {:'$1', :_, ip, port}) do
      if pk2 != pk do
        # obsolete peer information
        :dets.delete(@peer_db, pk2)
        :dets.match_delete(@shard_peer_db, {:_, pk2})
      end
    end
    :dets.insert(@peer_db, {pk, pid, ip, port})

    # Send interested message for all our shards
    id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id)
    GenServer.cast(pid, {:send_msg, {:interested, id_list}})

    # Send queued messages
    for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
      GenServer.cast(pid, {:send_msg, msg})
    end
    :ets.delete(state.outbox, pk)

    {:noreply, state}
  end

  def handle_cast({:peer_down, pk, ip, port}, state) do
    :dets.insert(@peer_db, {pk, nil, ip, port})
    {:noreply, state}
  end

  def handle_cast({:connect_and_send, peer_id, msg}, state) do
    case :dets.lookup(@peer_db, peer_id) do
      [{^peer_id, nil, ip, port}] ->
        add_peer(ip, port, state)
        currtime = System.os_time :second
        :ets.insert(state.outbox, {peer_id, msg, currtime})
        outbox_cleanup = [{{:_, :_, :'$1'},
                           [{:<, :'$1', currtime - 60}],
                           [:'$1']}]
        :ets.select_delete(state.outbox, outbox_cleanup)
      _ ->
        Logger.info "Dropping message #{inspect msg} for peer #{inspect peer_id}: peer not in database"
    end
    {:noreply, state}
  end

  def handle_cast({:try_connect, pk_list}, state) do
    for pk <- pk_list do
      case :dets.lookup(@peer_db, pk) do
        [{^pk, nil, ip, port}] ->
          add_peer(ip, port, state)
        _ -> nil
      end
    end
    {:noreply, state}
  end

  def handle_cast({:add_peer, ip, port}, state) do
    add_peer(ip, port, state)
    {:noreply, state}
  end

  def handle_info({:DOWN, _, :process, pid, _}, state) do
    :ets.match_delete(:shard_procs, {:_, pid})
    {:noreply, state}
  end

  defp add_peer(ip, port, state) do
    spawn fn ->
      case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
        {:ok, client} ->
          {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
          :ok = :gen_tcp.controlling_process(client, pid)
        _ ->
          Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
        end
    end
  end


  # ================
  # PUBLIC INTERFACE
  # ================
  

  @doc"""
  Connect to a peer specified by ip address and port
  """
  def add_peer(ip, port) do
    GenServer.cast(__MODULE__, {:add_peer, ip, port})
  end

  @doc"""
  Send message to a peer specified by peer id
  """
  def send(peer_id, msg) do
    case :dets.lookup(@peer_db, peer_id) do
      [{ ^peer_id, pid, _, _}] when pid != nil->
        GenServer.cast(pid, {:send_msg, msg})
      _ -> 
        GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
    end
  end

  @doc"""
  Dispatch incoming message to correct shard process
  """
  def dispatch(peer_id, {shard_id, path, msg}) do
    case :dets.lookup(@shard_db, shard_id) do
      [] ->
        __MODULE__.send(peer_id, {:not_interested, shard_id})
      [_] ->
        case :dets.match(@shard_peer_db, {shard_id, peer_id}) do
          [] ->
            GenServer.cast(__MODULE__, {:shard_peer_db_insert, shard_id, peer_id})
          _ -> nil
        end
        case :ets.lookup(:shard_procs, {shard_id, path}) do
          [{ {^shard_id, ^path}, pid }] ->
            GenServer.cast(pid, {:msg, peer_id, shard_id, path, msg})
          [] ->
            Logger.info("Warning: dropping message for #{inspect shard_id}/#{inspect path}, no handler running.\n\t#{inspect msg}")
        end
    end
  end

  def dispatch(peer_id, {:interested, shards}) do
    GenServer.cast(__MODULE__, {:interested, peer_id, shards})
  end

  def dispatch(peer_id, {:not_interested, shard}) do
    GenServer.cast(__MODULE__, {:not_interested, peer_id, shard})
  end

  @doc"""
  Register a process as the main process for a shard.

  Returns either :ok or :redundant, in which case the process must exit.
  """
  def register(shard_id, manifest, pid) do
    GenServer.call(__MODULE__, {:register, shard_id, manifest, pid})
  end

  @doc"""
  Register a process as the handler for shard packets for a given path.
  """
  def dispatch_to(shard_id, path, pid) do
    GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
  end

  def list_shards() do
    for [x] <- :dets.match(@shard_db, :"$1"), do: x
  end

  def list_peers() do
    for [x] <- :dets.match(@peer_db, :"$1"), do: x
  end

  def get_shard_peers(shard_id) do
    for [x] <- :dets.match(@shard_peer_db, {shard_id, :"$1"}), do: x
  end
end