aboutsummaryrefslogtreecommitdiff
path: root/lib/net
diff options
context:
space:
mode:
Diffstat (limited to 'lib/net')
-rw-r--r--lib/net/manager.ex9
-rw-r--r--lib/net/tcpconn.ex34
-rw-r--r--lib/net/tcpserver.ex2
3 files changed, 17 insertions, 28 deletions
diff --git a/lib/net/manager.ex b/lib/net/manager.ex
index e5eb12d..4b1ce94 100644
--- a/lib/net/manager.ex
+++ b/lib/net/manager.ex
@@ -40,7 +40,12 @@ defmodule SNet.Manager do
{:noreply, state}
end
- def handle_call({:get_connections, pk_list}, state) do
+ def handle_call(:get_all, _from, state) do
+ pid_list = (for {_, {pid, _, _}} <- state.peers, pid != nil, do: pid)
+ {:reply, pid_list, state}
+ end
+
+ def handle_call({:get_connections, pk_list}, _from, state) do
pid_list = (for pk <- pk_list, Map.has_key?(state.peers, pk), do: state.peers[pk])
|> Enum.map(fn {pid, _, _} -> pid end)
|> Enum.filter(&(&1 != nil))
@@ -53,7 +58,7 @@ defmodule SNet.Manager do
def add_peer(ip, port, my_port) do
{:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
- {:ok, pid} = DynamicSupervisor.start_child(SNet.ConnSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
:ok = :gen_tcp.controlling_process(client, pid)
pid
end
diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex
index 301e931..64c85e9 100644
--- a/lib/net/tcpconn.ex
+++ b/lib/net/tcpconn.ex
@@ -53,10 +53,9 @@ defmodule SNet.TCPConn do
addr: addr,
port: port
}
- GenServer.cast(SNet.Manager, {:peer_up, self(), cli_pkey, addr, his_port})
+ GenServer.cast(SNet.Manager, {:peer_up, cli_pkey, self(), addr, his_port})
Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
-
- GenServer.cast(self(), :init_push)
+ GenServer.cast(self(), :init_pull)
{:noreply, state}
end
@@ -66,8 +65,9 @@ defmodule SNet.TCPConn do
{:noreply, state}
end
- def handle_cast(:init_push, state) do
- push_messages(state, nil, 10)
+ def handle_cast(:init_pull, state) do
+ id_list = (for {id, _} <- GenServer.call(Shard.Manager, :list), do: id)
+ send_msg(state, {:interested, id_list})
{:noreply, state}
end
@@ -102,28 +102,12 @@ defmodule SNet.TCPConn do
exit(:normal)
end
- defp push_messages(state, start, num) do
- case GenServer.call(SApp.Chat.Log, {:read, start, num}) do
- {:ok, list, rest} ->
- send_msg(state, {:info, start, list, rest})
- _ -> nil
- end
+ defp handle_packet({:interested, shards}, state) do
+ GenServer.cast(Shard.Manager, {:interested, state.his_pkey, self(), shards})
end
- defp handle_packet(msg, state) do
- # Logger.info "Message: #{inspect msg}"
- case msg do
- :get_top -> push_messages(state, nil, 10)
- {:get, start} -> push_messages(state, start, 20)
- {:info, _start, list, rest} ->
- if rest != nil and not GenServer.call(SApp.Chat.Log, {:has, rest}) do
- send_msg(state, {:get, rest})
- end
- spawn_link(fn ->
- Process.sleep 1000
- GenServer.cast(SApp.Chat.Log, {:insert_many, list, &SApp.Chat.msg_callback/1})
- end)
- end
+ defp handle_packet({shard, msg}, state) do
+ GenServer.cast(Shard.Manager, {:dispatch, state.his_pkey, self(), shard, msg})
end
defp print_id(state) do
diff --git a/lib/net/tcpserver.ex b/lib/net/tcpserver.ex
index 7c758c1..46552a4 100644
--- a/lib/net/tcpserver.ex
+++ b/lib/net/tcpserver.ex
@@ -18,7 +18,7 @@ defmodule SNet.TCPServer do
defp loop_acceptor(socket, my_port) do
{:ok, client} = :gen_tcp.accept(socket)
- {:ok, pid} = DynamicSupervisor.start_child(SNet.ConnSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket, my_port)
end