1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
defmodule Shard.Manager do
@moduledoc"""
Maintains two important tables :
- :peer_db
List of
{ id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }
- :shard_db
List of
{ id, manifest, shard_pid }
And also some others :
- :peer_shard_db
Mult-list of
{ peer_id, shard_id }
- :outbox
Multi-list of
{ peer_id, message }
"""
use GenServer
def start_link(my_port) do
GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
end
def init(my_port) do
:ets.new(:peer_db, [:set, :protected, :named_table])
:ets.new(:shard_db, [:set, :protected, :named_table])
peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private])
outbox = :ets.new(:outbox, [:bag, :private])
{:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} }
end
def handle_call({:find, shard_id}, _from, state) do
reply = case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] -> {:ok, pid}
[] -> :not_found
end
{:reply, reply, state}
end
def handle_cast({:register, shard_id, manifest, pid}, state) do
will_live = case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] -> not Process.alive?(pid)
_ -> true
end
if will_live do
:ets.insert(:shard_db, {shard_id, manifest, pid})
else
GenServer.cast(pid, {:redundant, shard_id})
end
{:noreply, state}
end
def handle_cast({:interested, peer_id, shards}, state) do
for shard_id <- shards do
case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] ->
:ets.insert(state.peer_shard_db, {peer_id, shard_id})
GenServer.cast(pid, {:interested, peer_id})
[] -> nil
end
end
{:noreply, state}
end
def handle_cast({:peer_up, pk, pid, ip, port}, state) do
:ets.insert(:peer_db, {pk, pid, ip, port})
for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
GenServer.cast(pid, {:send_msg, msg})
end
:ets.delete(state.outbox, pk)
{:noreply, state}
end
def handle_cast({:peer_down, pk, ip, port}, state) do
:ets.insert(:peer_db, {pk, nil, ip, port})
{:noreply, state}
end
def handle_cast({:connect_and_send, peer_id, msg}, state) do
case :ets.lookup(:peer_db, peer_id) do
[{^peer_id, nil, ip, port}] ->
add_peer(ip, port, state)
currtime = System.os_time :second
:ets.insert(state.outbox, {peer_id, msg, currtime})
outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end)
:ets.select_delete(state.outbox, outbox_cleanup)
_ -> nil
end
{:noreply, state}
end
def handle_cast({:try_connect, pk_list}, state) do
for pk <- pk_list do
case :ets.lookup(:peer_db, pk) do
[{^pk, nil, ip, port}] ->
add_peer(ip, port, state)
_ -> nil
end
end
{:noreply, state}
end
def handle_cast({:add_peer, ip, port}, state) do
add_peer(ip, port, state)
{:noreply, state}
end
defp add_peer(ip, port, state) do
spawn fn ->
{:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
{:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
:ok = :gen_tcp.controlling_process(client, pid)
end
end
def add_peer(ip, port) do
GenServer.cast(__MODULE__, {:add_peer, ip, port})
end
def send(peer_id, msg) do
case :ets.lookup(:peer_db, peer_id) do
[{ ^peer_id, pid, _, _}] when pid != nil->
GenServer.cast(pid, {:send_msg, msg})
_ ->
GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
end
end
def dispatch(peer_id, shard_id, msg) do
case :ets.lookup(:shard_db, shard_id) do
[{ ^shard_id, _, pid }] when pid != nil ->
GenServer.cast(pid, {:msg, peer_id, msg})
[_] -> nil # TODO restart shard
[] -> nil # TODO send not interested
end
end
end
|