aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/net/manager.ex
blob: 759c5f0e868c2eba42025244832c97a061db6841 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
defmodule SNet.Manager do
  @moduledoc"""
  - :connections (not persistent)

      List of
        { peer_info, pid, nil | {my_pk, his_pk} }
  """

  use GenServer

  require Logger

  def start_link(_) do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def init(_) do
    Process.flag(:trap_exit, true)

    :ets.new(:connections, [:bag, :protected, :named_table])

    {:ok, nil} 
  end

  def handle_call({:add_peer, peer_info, auth, callback}, _from, state) do
    pid = add_peer_internal(peer_info, auth)
    if callback != nil do
      GenServer.cast(pid, {:callback, callback})
    end
    {:reply, pid, state}
  end

  def handle_call({:accept, client}, _from, state) do
    my_port = Application.get_env(:shard, :port)
    {:ok, pid} = SNet.TCPConn.start_link(%{socket: client, my_port: my_port})
    {:reply, pid, state}
  end

  def handle_call({:peer_up, pid, peer_info, auth}, _from, state) do
    case :ets.match(:connections,  {peer_info, :'$1', (if auth != nil do auth else :_ end), :_}) do
      [[pid2]|_] when pid2 != pid ->
        {:reply, :redundant, state}
      _ ->
        :ets.match_delete(:connections, {peer_info, pid, :_, :_})
        :ets.insert(:connections, {peer_info, pid, auth, :established})

        if auth != nil do
          for [pid3] <- :ets.match(:connections, {peer_info, :'$1', nil, :_}) do
            GenServer.cast(pid3, :close)
          end
        end

        # Send interested message for all our shards
        id_list = (for {id, _, _} <- Shard.Manager.list_shards(), do: id)
        GenServer.cast(pid, {:send_msg, {:interested, id_list}})

        {:reply, :ok, state}
    end
  end

  def handle_cast({:connect_and_send, peer_info, auth, msg}, state) do
    pid = add_peer_internal(peer_info, auth)
    GenServer.cast(pid, {:send_msg, msg})
    {:noreply, state}
  end

  def handle_info({:EXIT, pid, _reason}, state) do
    :ets.match_delete(:connections, {:_, pid, :_, :_})
    {:noreply, state}
  end

  defp add_peer_internal(peer_info, auth) do
    if SNet.Addr.is_local? peer_info do
      nil
    else
      case :ets.match(:connections, {peer_info, :'$1', (if auth != nil do auth else :_ end), :_}) do
        [[pid]|_] -> pid
        [] -> 
          my_port = Application.get_env(:shard, :port)
          {:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: auth})
          :ets.insert(:connections, {peer_info, pid, auth, :establishing})
          pid
      end
    end
  end

  # =========
  # INTERFACE
  # =========

  @doc"""
  Connect to a peer specified by ip address and port
  """
  def add_peer(peer_info, opts \\ []) do
    GenServer.call(__MODULE__, {:add_peer, peer_info, opts[:auth], opts[:callback]})
  end

  @doc"""
  Return the list of all connected peers
  """
  def list_connections() do
    :ets.tab2list(:connections)
  end

  @doc"""
  Return the list of connections to a given peer, possibly with different auth
  """
  def get_connections_to(peer_info) do
    for {^peer_info, pid, auth, _} <- :ets.lookup(:connections, peer_info), do: {pid, auth}
  end

  @doc"""
  Return the list of connections to a given peer that match a given auth spec
  """
  def get_auth_connections_to(peer_info, my_auth, his_auth) do
    for {^peer_info, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, _} <- :ets.lookup(:connections, peer_info),
      my_pk == my_auth or my_pk in my_auth,
      his_pk == his_auth or his_pk in his_auth,
    do: pid
  end

  @doc"""
  Send message to a peer specified by peer info.
  Opens a connection if necessary.
  """
  def send(peer_info, msg) do
    case :ets.lookup(:connections, peer_info) do
      [{^peer_info, pid, _auth, _}|_] ->
        GenServer.cast(pid, {:send_msg, msg})
      [] ->
        GenServer.cast(__MODULE__, {:connect_and_send, peer_info, nil, msg})
    end
  end

  def send_auth(peer_info, auth, msg) do
    case :ets.match(:connections, {peer_info, :'$1', auth, :_}) do
      [[pid]|_] ->
        GenServer.cast(pid, {:send_msg, msg})
      [] ->
        GenServer.cast(__MODULE__, {:connect_and_send, peer_info, auth, msg})
    end
  end

  @doc"""
  Send message to a peer specified by peer id
  """
  def send_pid(pid, msg) do
    GenServer.cast(pid, {:send_msg, msg})
  end
end