aboutsummaryrefslogtreecommitdiff
path: root/lib/manager.ex
blob: 45aae5fe292d70e18d515731ce6b13494af6505f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
defmodule Shard.Manager do
  @moduledoc"""
    Maintains two important tables :

    - :peer_db
      
      List of
        { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }

    - :shard_db
    
      List of
        { id, manifest, shard_pid }

    And also some others :

    - :peer_shard_db

      Mult-list of
        { peer_id, shard_id }

    - :outbox
      
      Multi-list of
        { peer_id, message }

  """

  use GenServer

  def start_link(my_port) do
    GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
  end

  def init(my_port) do
    :ets.new(:peer_db, [:set, :protected, :named_table])
    :ets.new(:shard_db, [:set, :protected, :named_table])
    peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private])
    outbox = :ets.new(:outbox, [:bag, :private])

    {:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} }
  end

  def handle_call({:find, shard_id}, _from, state) do
    reply = case :ets.lookup(:shard_db, shard_id) do
      [{ ^shard_id, _, pid }] -> {:ok, pid}
      [] -> :not_found
    end
    {:reply, reply, state}
  end

  def handle_cast({:register, shard_id, manifest, pid}, state) do
    will_live = case :ets.lookup(:shard_db, shard_id) do
      [{ ^shard_id, _, pid }] -> not Process.alive?(pid)
      _ -> true
    end
    if will_live do
      :ets.insert(:shard_db, {shard_id, manifest, pid})
    else
      GenServer.cast(pid, {:redundant, shard_id})
    end
    {:noreply, state}
  end

  def handle_cast({:interested, peer_id, shards}, state) do
    for shard_id <- shards do
      case :ets.lookup(:shard_db, shard_id) do
        [{ ^shard_id, _, pid }] ->
          :ets.insert(state.peer_shard_db, {peer_id, shard_id})
          GenServer.cast(pid, {:interested, peer_id})
        [] -> nil
      end
    end
    {:noreply, state}
  end



  def handle_cast({:peer_up, pk, pid, ip, port}, state) do
    :ets.insert(:peer_db, {pk, pid, ip, port})
    for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
      GenServer.cast(pid, {:send_msg, msg})
    end
    :ets.delete(state.outbox, pk)
    {:noreply, state}
  end

  def handle_cast({:peer_down, pk, ip, port}, state) do
    :ets.insert(:peer_db, {pk, nil, ip, port})
    {:noreply, state}
  end

  def handle_cast({:connect_and_send, peer_id, msg}, state) do
    case :ets.lookup(:peer_db, peer_id) do
      [{^peer_id, nil, ip, port}] ->
        add_peer(ip, port, state)
        currtime = System.os_time :second
        :ets.insert(state.outbox, {peer_id, msg, currtime})
        outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end)
        :ets.select_delete(state.outbox, outbox_cleanup)
      _ -> nil
    end
    {:noreply, state}
  end

  def handle_cast({:try_connect, pk_list}, state) do
    for pk <- pk_list do
      case :ets.lookup(:peer_db, pk) do
        [{^pk, nil, ip, port}] ->
          add_peer(ip, port, state)
        _ -> nil
      end
    end
    {:noreply, state}
  end

  def handle_cast({:add_peer, ip, port}, state) do
    add_peer(ip, port, state)
    {:noreply, state}
  end

  defp add_peer(ip, port, state) do
    spawn fn ->
      {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
      {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
      :ok = :gen_tcp.controlling_process(client, pid)
    end
  end

  def add_peer(ip, port) do
    GenServer.cast(__MODULE__, {:add_peer, ip, port})
  end

  def send(peer_id, msg) do
    case :ets.lookup(:peer_db, peer_id) do
      [{ ^peer_id, pid, _, _}] when pid != nil->
        GenServer.cast(pid, {:send_msg, msg})
      _ -> 
        GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
    end
  end

  def dispatch(peer_id, shard_id, msg) do
    case :ets.lookup(:shard_db, shard_id) do
        [{ ^shard_id, _, pid }] when pid != nil ->
          GenServer.cast(pid, {:msg, peer_id, msg})
        [_] -> nil # TODO restart shard
        [] -> nil  # TODO send not interested
    end
  end
end