aboutsummaryrefslogblamecommitdiff
path: root/lib/manager.ex
blob: 87f95c5d9b9ac2477db157d5fe16b617d2098a72 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
                          
               
                                    










                                                                            

                          




                             




                            

     

               

                

                                                               

     






                                                                             


                                                     

















                                                                 

     









                                                               

     





                                                          
       






                                                       

     





                                                              


                                                        


                                                        


                     






                                                    


                     







                                                  






                                                                                                                                        























                                                                     
   
defmodule Shard.Manager do
  @moduledoc"""
    Maintains two important tables :

    - :peer_db
      
      List of
        { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }

    - :shard_db
    
      List of
        { id, manifest, shard_pid }

    And also some others :

    - :peer_shard_db

      Mult-list of
        { peer_id, shard_id }

    - :outbox
      
      Multi-list of
        { peer_id, message }

  """

  use GenServer

  require Logger

  def start_link(my_port) do
    GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
  end

  def init(my_port) do
    :ets.new(:peer_db, [:set, :protected, :named_table])
    :ets.new(:shard_db, [:set, :protected, :named_table])
    peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private])
    outbox = :ets.new(:outbox, [:bag, :private])

    {:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} }
  end

  def handle_call({:find, shard_id}, _from, state) do
    reply = case :ets.lookup(:shard_db, shard_id) do
      [{ ^shard_id, _, pid }] -> {:ok, pid}
      [] -> :not_found
    end
    {:reply, reply, state}
  end

  def handle_cast({:register, shard_id, manifest, pid}, state) do
    will_live = case :ets.lookup(:shard_db, shard_id) do
      [{ ^shard_id, _, pid }] -> not Process.alive?(pid)
      _ -> true
    end
    if will_live do
      :ets.insert(:shard_db, {shard_id, manifest, pid})
    else
      GenServer.cast(pid, {:redundant, shard_id})
    end
    {:noreply, state}
  end

  def handle_cast({:interested, peer_id, shards}, state) do
    for shard_id <- shards do
      case :ets.lookup(:shard_db, shard_id) do
        [{ ^shard_id, _, pid }] ->
          :ets.insert(state.peer_shard_db, {peer_id, shard_id})
          GenServer.cast(pid, {:interested, peer_id})
        [] -> nil
      end
    end
    {:noreply, state}
  end



  def handle_cast({:peer_up, pk, pid, ip, port}, state) do
    :ets.insert(:peer_db, {pk, pid, ip, port})
    for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
      GenServer.cast(pid, {:send_msg, msg})
    end
    :ets.delete(state.outbox, pk)
    {:noreply, state}
  end

  def handle_cast({:peer_down, pk, ip, port}, state) do
    :ets.insert(:peer_db, {pk, nil, ip, port})
    {:noreply, state}
  end

  def handle_cast({:connect_and_send, peer_id, msg}, state) do
    case :ets.lookup(:peer_db, peer_id) do
      [{^peer_id, nil, ip, port}] ->
        add_peer(ip, port, state)
        currtime = System.os_time :second
        :ets.insert(state.outbox, {peer_id, msg, currtime})
        outbox_cleanup = [{{:_, :_, :'$1'},
                           [{:<, :'$1', currtime - 60}],
                           [:'$1']}]
        :ets.select_delete(state.outbox, outbox_cleanup)
      _ -> nil
    end
    {:noreply, state}
  end

  def handle_cast({:try_connect, pk_list}, state) do
    for pk <- pk_list do
      case :ets.lookup(:peer_db, pk) do
        [{^pk, nil, ip, port}] ->
          add_peer(ip, port, state)
        _ -> nil
      end
    end
    {:noreply, state}
  end

  def handle_cast({:add_peer, ip, port}, state) do
    add_peer(ip, port, state)
    {:noreply, state}
  end

  defp add_peer(ip, port, state) do
    spawn fn ->
      case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
        {:ok, client} ->
          {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
          :ok = :gen_tcp.controlling_process(client, pid)
        _ ->
          Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
        end
    end
  end

  def add_peer(ip, port) do
    GenServer.cast(__MODULE__, {:add_peer, ip, port})
  end

  def send(peer_id, msg) do
    case :ets.lookup(:peer_db, peer_id) do
      [{ ^peer_id, pid, _, _}] when pid != nil->
        GenServer.cast(pid, {:send_msg, msg})
      _ -> 
        GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
    end
  end

  def dispatch(peer_id, shard_id, msg) do
    case :ets.lookup(:shard_db, shard_id) do
        [{ ^shard_id, _, pid }] when pid != nil ->
          GenServer.cast(pid, {:msg, peer_id, msg})
        [_] -> nil # TODO restart shard
        [] -> nil  # TODO send not interested
    end
  end
end