1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
defmodule Shard.Manager do
@moduledoc"""
Maintains three tables :
- :peer_db
List of
{ id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }
- :shard_db
List of
{ id, manifest, shard_pid }
- :peer_shard_db
Mult-list of
{ peer_id, shard_id }
"""
use GenServer
def start_link(my_port) do
GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
end
def init(my_port) do
:ets.new(:peer_db, [:set, :protected, :named_table])
:ets.new(:shard_db, [:set, :protected, :named_table])
peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private])
outbox = :ets.new(:outbox, [:bag, :private])
{:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} }
end
def handle_call({:find, shard_id}, _from, state) do
reply = case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] -> {:ok, pid}
[] -> :not_found
end
{:reply, reply, state}
end
def handle_cast({:register, shard_id, manifest, pid}, state) do
will_live = case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] -> not Process.alive?(pid)
_ -> true
end
if will_live do
:ets.insert(:shard_db, {shard_id, manifest, pid})
else
GenServer.cast(pid, {:redundant, shard_id})
end
{:noreply, state}
end
def handle_cast({:interested, peer_id, shards}, state) do
for shard_id <- shards do
case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] ->
:ets.insert(state.peer_shard_db, {peer_id, shard_id})
GenServer.cast(pid, {:interested, peer_id})
[] -> nil
end
end
{:noreply, state}
end
def handle_cast({:peer_up, pk, pid, ip, port}, state) do
:ets.insert(:peer_db, {pk, pid, ip, port})
for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
GenServer.cast(pid, {:send_msg, msg})
end
:ets.delete(state.outbox, pk)
{:noreply, state}
end
def handle_cast({:peer_down, pk, ip, port}, state) do
:ets.insert(:peer_db, {pk, nil, ip, port})
{:noreply, state}
end
def handle_cast({:connect_and_send, peer_id, msg}, state) do
case :ets.lookup(:peer_db, peer_id) do
[{^peer_id, nil, ip, port}] ->
add_peer(ip, port, state)
currtime = System.os_time :second
:ets.insert(state.outbox, {peer_id, msg, currtime})
outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end)
:ets.select_delete(state.outbox, outbox_cleanup)
_ -> nil
end
{:noreply, state}
end
def handle_cast({:try_connect, pk_list}, state) do
for pk <- pk_list do
case :ets.lookup(:peer_db, pk) do
[{^pk, nil, ip, port}] ->
add_peer(ip, port, state)
_ -> nil
end
end
{:noreply, state}
end
def handle_cast({:add_peer, ip, port}, state) do
add_peer(ip, port, state)
{:noreply, state}
end
defp add_peer(ip, port, state) do
spawn fn ->
{:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
{:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
:ok = :gen_tcp.controlling_process(client, pid)
end
end
def add_peer(ip, port) do
GenServer.cast(__MODULE__, {:add_peer, ip, port})
end
def send(peer_id, msg) do
case :ets.lookup(:peer_db, peer_id) do
[{ ^peer_id, pid, _, _}] when pid != nil->
GenServer.cast(pid, {:send_msg, msg})
_ ->
GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
end
end
def dispatch(peer_id, shard_id, msg) do
case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] when pid != nil ->
GenServer.cast(pid, {:msg, peer_id, msg})
[_] -> nil # TODO restart shard
[] -> nil # TODO send not interested
end
end
end
|