diff options
author | Alex Auvolat <alex@adnab.me> | 2018-10-11 15:11:52 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-10-11 15:11:52 +0200 |
commit | 1646bc57eae9880fd408d23ca692364dc6fd6442 (patch) | |
tree | 0f65d47498f33772152b20ee082d34e218f16df4 /shard | |
parent | eb8c949551ffb8b3600357d7ff2bebe750af96e5 (diff) | |
download | shard-1646bc57eae9880fd408d23ca692364dc6fd6442.tar.gz shard-1646bc57eae9880fd408d23ca692364dc6fd6442.zip |
Move somme functionnality to SNet.Manager
Diffstat (limited to 'shard')
-rw-r--r-- | shard/lib/app/chat.ex | 6 | ||||
-rw-r--r-- | shard/lib/app/identity.ex | 6 | ||||
-rw-r--r-- | shard/lib/app/pagestore.ex | 8 | ||||
-rw-r--r-- | shard/lib/application.ex | 7 | ||||
-rw-r--r-- | shard/lib/cli/cli.ex | 2 | ||||
-rw-r--r-- | shard/lib/manager.ex | 103 | ||||
-rw-r--r-- | shard/lib/net/group.ex | 17 | ||||
-rw-r--r-- | shard/lib/net/manager.ex | 119 | ||||
-rw-r--r-- | shard/lib/net/tcpconn.ex | 25 | ||||
-rw-r--r-- | shard/lib/net/tcpserver.ex | 3 |
10 files changed, 158 insertions, 138 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 051dfef..53767ef 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -118,7 +118,7 @@ defmodule SApp.Chat do end end - notif = {:append, prev_root, msgitem, mst.root} + notif = {state.id, nil, {:append, prev_root, msgitem, mst.root}} SNet.Group.broadcast(state.group, notif) {:noreply, state} @@ -130,7 +130,7 @@ defmodule SApp.Chat do """ def handle_cast({:interested, conn_pid, auth}, state) do if SNet.Group.in_group?(state.group, conn_pid, auth) do - Shard.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) end {:noreply, state} end @@ -157,7 +157,7 @@ defmodule SApp.Chat do else state = case msg do {:get_manifest} -> - Shard.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}}) + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}}) state {:append, prev_root, msgitem, new_root} -> # Append message: one single mesage has arrived diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex index d2748a1..391d37e 100644 --- a/shard/lib/app/identity.ex +++ b/shard/lib/app/identity.ex @@ -84,14 +84,14 @@ defmodule SApp.Identity do end def handle_cast(:init_pull, state) do - for {_, pid, _} <- Shard.Manager.list_connections do + for {_, pid, _} <- SNet.Manager.list_connections do GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) end {:noreply, state} end def handle_cast({:interested, peer_pid, _auth}, state) do - Shard.Manager.send_pid(peer_pid, {state.id, nil, {:update, SData.SignRev.signed(state.state)}}) + SNet.Manager.send_pid(peer_pid, {state.id, nil, {:update, SData.SignRev.signed(state.state)}}) {:noreply, state} end @@ -115,7 +115,7 @@ defmodule SApp.Identity do def bcast_state(state, exclude \\ []) do for peer_id <- Shard.Manager.get_shard_peers(state.id) do if not Enum.member? exclude, peer_id do - Shard.Manager.send(peer_id, {state.id, nil, {:update, SData.SignRev.signed(state.state)}}) + SNet.Manager.send(peer_id, {state.id, nil, {:update, SData.SignRev.signed(state.state)}}) end end end diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex index e09c513..ad18eac 100644 --- a/shard/lib/app/pagestore.ex +++ b/shard/lib/app/pagestore.ex @@ -107,7 +107,7 @@ defmodule SApp.PageStore do case prefer_ask do [_|_] -> for peer <- prefer_ask do - Shard.Manager.send_pid(peer, {state.shard_id, state.path, {:get, key}}) + SNet.Manager.send_pid(peer, {state.shard_id, state.path, {:get, key}}) end _ -> ask_random_peers(state, key) @@ -128,9 +128,9 @@ defmodule SApp.PageStore do {:get, key} -> case :dets.lookup state.store, key do [{_, _, bin}] -> - Shard.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}}) + SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}}) _ -> - Shard.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}}) + SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}}) end state {:info, hash, bin} -> @@ -251,7 +251,7 @@ defmodule SApp.PageStore do |> Enum.shuffle |> Enum.take(3) for peer <- peers do - Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) + SNet.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) end end diff --git a/shard/lib/application.ex b/shard/lib/application.ex index 0b61cc0..0daf48e 100644 --- a/shard/lib/application.ex +++ b/shard/lib/application.ex @@ -17,13 +17,14 @@ defmodule Shard.Application do # Applications & data store Shard.Manager, + + # Networking + SNet.Manager, + SNet.TCPServer, # Keys & identities Shard.Keys, { Task, fn -> Shard.Keys.get_any_identity end }, - - # Networking - SNet.TCPServer, ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex index 5f3dc7f..f7e8525 100644 --- a/shard/lib/cli/cli.ex +++ b/shard/lib/cli/cli.ex @@ -72,7 +72,7 @@ defmodule SCLI do defp handle_command(state, ["connect", ipstr, portstr]) do {:ok, ip} = :inet.parse_address (to_charlist ipstr) {port, _} = Integer.parse portstr - Shard.Manager.add_peer({:inet, ip, port}) + SNet.Manager.add_peer({:inet, ip, port}) state end diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index 1e089ba..9def229 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -40,23 +40,6 @@ defmodule Shard.Manager do List of { {id, path}, pid } - - - :connections (not persistent) - - List of - { peer_info, pid, nil | {my_pk, his_pk} } - - And an internal table : - - - :outbox (not persistent) - - Multi-list of - { dest_peer_info, message, time_inserted } - - dest := peer_info - - No support for messages on authenticated channels - """ use GenServer @@ -82,10 +65,8 @@ defmodule Shard.Manager do :dets.open_file(@peer_db, [type: :bag]) :ets.new(:shard_procs, [:set, :protected, :named_table]) - :ets.new(:connections, [:bag, :protected, :named_table]) - outbox = :ets.new(:outbox, [:bag, :private]) - {:ok, %{outbox: outbox} } + {:ok, nil} end def handle_call({:register, shard_id, manifest, pid}, _from, state) do @@ -131,41 +112,6 @@ defmodule Shard.Manager do {:noreply, state} end - def handle_cast({:peer_up, pid, peer_info, auth}, state) do - :ets.insert(:connections, {peer_info, pid, auth}) - - # Send interested message for all our shards - id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id) - GenServer.cast(pid, {:send_msg, {:interested, id_list}}) - - # Send queued messages - for {_, msg, _} <- :ets.lookup(state.outbox, peer_info) do - GenServer.cast(pid, {:send_msg, msg}) - end - :ets.delete(state.outbox, peer_info) - - {:noreply, state} - end - - def handle_cast({:peer_down, peer_pid, peer_info, auth}, state) do - :ets.match_delete(:connections, {peer_info, peer_pid, auth}) - {:noreply, state} - end - - def handle_cast({:connect_and_send, peer_info, msg}, state) do - case :ets.lookup(:connections, peer_info) do - [{_, pid, _}|_] -> - GenServer.cast(pid, {:send_msg, msg}) - [] -> - add_peer(peer_info) - currtime = System.os_time :second - :ets.insert(state.outbox, {peer_info, msg, currtime}) - outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ] - :ets.select_delete(state.outbox, outbox_cleanup) - end - {:noreply, state} - end - def handle_info({:DOWN, _, :process, pid, _}, state) do :ets.match_delete(:shard_procs, {:_, pid}) {:noreply, state} @@ -212,26 +158,6 @@ defmodule Shard.Manager do # ================ @doc""" - Send message to a peer specified by peer id - """ - def send_pid(pid, msg) do - GenServer.cast(pid, {:send_msg, msg}) - end - - @doc""" - Send message to a peer specified by peer info. - Opens a connection if necessary. - """ - def send(peer_info, msg) do - case :ets.lookup(:connections, peer_info) do - [{^peer_info, pid, _auth}|_] -> - GenServer.cast(pid, {:send_msg, msg}) - [] -> - GenServer.cast(__MODULE__, {:connect_and_send, peer_info, msg}) - end - end - - @doc""" Register a process as the main process for a shard. Returns either :ok or :redundant, in which case the process must exit. @@ -277,22 +203,6 @@ defmodule Shard.Manager do # ================ @doc""" - Connect to a peer specified by ip address and port - """ - def add_peer({:inet, ip, port}) do - spawn fn -> - case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do - {:ok, client} -> - my_port = Application.get_env(:shard, :port) - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port, is_client: true, auth: nil}}) - :ok = :gen_tcp.controlling_process(client, pid) - _ -> - Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" - end - end - end - - @doc""" Returns the pid for a shard if it exists """ def find_proc(shard_id) do @@ -308,15 +218,4 @@ defmodule Shard.Manager do def list_shards() do for [x] <- :dets.match(@shard_db, :"$1"), do: x end - - @doc""" - Return the list of all connected peers - """ - def list_connections() do - for [x] <- :ets.match(:connections, :"$1"), do: x - end - - def get_connections_to(peer_info) do - for {^peer_info, pid, auth} <- :ets.lookup(:connections, peer_info), do: {pid, auth} - end end diff --git a/shard/lib/net/group.ex b/shard/lib/net/group.ex index d2c2537..692438a 100644 --- a/shard/lib/net/group.ex +++ b/shard/lib/net/group.ex @@ -36,12 +36,12 @@ defmodule SNet.PubShardGroup do def init_lookup(%SNet.PubShardGroup{id: id}, _notify_to) do # For now: ask all currently connected peers and connect to new peers we know of spawn fn -> - for {_, pid, _} <- Shard.Manager.list_connections do + for {_, pid, _} <- SNet.Manager.list_connections do GenServer.cast(pid, {:send_msg, {:interested, [id]}}) end for peer_info <- Shard.Manager.get_shard_peers id do - if Shard.Manager.get_connections_to peer_info == [] do - Shard.Manager.add_peer(peer_info) # TODO callback when connected + if SNet.Manager.get_connections_to peer_info == [] do + SNet.Manager.add_peer(peer_info) # TODO callback when connected end end end @@ -49,9 +49,10 @@ defmodule SNet.PubShardGroup do end def get_connections(%SNet.PubShardGroup{id: id}) do - for peer_info <- Shard.Manager.get_shard_peers(id), - [{pid, _auth}|_] = Shard.Manager.get_connections_to(peer_info), - do: pid + Shard.Manager.get_shard_peers(id) + |> Enum.map(&(SNet.Manager.get_connections_to(&1))) + |> Enum.filter(&(&1 != [])) + |> Enum.map(fn [{pid, _auth}|_] -> pid end) end def broadcast(group, msg, nmax) do @@ -63,10 +64,10 @@ defmodule SNet.PubShardGroup do |> Enum.count if nmax - nsent > 0 do Shard.Manager.get_shard_peers(id) - |> Enum.filter(&(Shard.Manager.get_connections_to(&1) == [])) + |> Enum.filter(&(SNet.Manager.get_connections_to(&1) == [])) |> Enum.shuffle |> Enum.take(nmax - nsent) - |> Enum.map(&(Shard.Manager.send(&1, msg))) + |> Enum.map(&(SNet.Manager.send(&1, msg))) end end diff --git a/shard/lib/net/manager.ex b/shard/lib/net/manager.ex new file mode 100644 index 0000000..17d6e06 --- /dev/null +++ b/shard/lib/net/manager.ex @@ -0,0 +1,119 @@ +defmodule SNet.Manager do + @moduledoc""" + - :connections (not persistent) + + List of + { peer_info, pid, nil | {my_pk, his_pk} } + """ + + use GenServer + + require Logger + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + def init(_) do + Process.flag(:trap_exit, true) + + :ets.new(:connections, [:bag, :protected, :named_table]) + + {:ok, nil} + end + + def handle_call({:add_peer, peer_info}, _from, state) do + pid = add_peer_internal(peer_info) + {:reply, pid, state} + end + + def handle_call({:accept, client}, _from, state) do + my_port = Application.get_env(:shard, :port) + {:ok, pid} = SNet.TCPConn.start_link(%{socket: client, my_port: my_port}) + {:reply, pid, state} + end + + def handle_call({:peer_up, pid, peer_info, auth}, _from, state) do + case :ets.match(:connections, {peer_info, :_, auth}) do + [{_, pid2, _}] when pid2 != pid -> + {:reply, :redundant, state} + _ -> + :ets.insert(:connections, {peer_info, pid, auth}) + + # Send interested message for all our shards + id_list = (for {id, _, _} <- Shard.Manager.list_shards(), do: id) + GenServer.cast(pid, {:send_msg, {:interested, id_list}}) + + {:reply, :ok, state} + end + end + + def handle_cast({:connect_and_send, peer_info, msg}, state) do + pid = add_peer_internal(peer_info) + GenServer.cast(pid, {:send_msg, msg}) + {:noreply, state} + end + + def handle_info({:EXIT, pid, _reason}, state) do + :ets.match_delete(:connections, {:_, pid, :_}) + {:noreply, state} + end + + defp add_peer_internal(peer_info) do + case :ets.lookup(:connections, peer_info) do + [{_, pid, _}|_] -> + pid + [] -> + my_port = Application.get_env(:shard, :port) + {:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: nil}) + :ets.insert(:connections, {peer_info, pid, nil}) + pid + end + end + + # ========= + # INTERFACE + # ========= + + @doc""" + Connect to a peer specified by ip address and port + """ + def add_peer(peer_info) do + GenServer.call(__MODULE__, {:add_peer, peer_info}) + end + + @doc""" + Return the list of all connected peers + """ + def list_connections() do + for [x] <- :ets.match(:connections, :"$1"), do: x + end + + @doc""" + Return the list of connections to a given peer, possibly with different auth + """ + def get_connections_to(peer_info) do + for {^peer_info, pid, auth} <- :ets.lookup(:connections, peer_info), do: {pid, auth} + end + + @doc""" + Send message to a peer specified by peer info. + Opens a connection if necessary. + """ + def send(peer_info, msg) do + case :ets.lookup(:connections, peer_info) do + [{^peer_info, pid, _auth}|_] -> + GenServer.cast(pid, {:send_msg, msg}) + [] -> + GenServer.cast(__MODULE__, {:connect_and_send, peer_info, msg}) + end + end + + @doc""" + Send message to a peer specified by peer id + """ + def send_pid(pid, msg) do + GenServer.cast(pid, {:send_msg, msg}) + end +end + diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex index 67d7f4c..476c426 100644 --- a/shard/lib/net/tcpconn.ex +++ b/shard/lib/net/tcpconn.ex @@ -33,23 +33,21 @@ defmodule SNet.TCPConn do end def init(state) do - if state.is_client do - GenServer.cast(self(), :client_handshake) - else + if Map.has_key?(state, :socket) do GenServer.cast(self(), :server_handshake) + else + GenServer.cast(self(), :client_handshake) end - {:ok, state} end - def handle_call(:get_peer_info, _from, state) do {:reply, state.peer_info, state} end - def handle_cast(:client_handshake, state) do - socket = state.socket + {:inet, ip, port} = state.connect_to + {:ok, socket} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) net_key = Application.get_env(:shard, :network_key) %{public: cli_eph_pk, secret: cli_eph_sk} = :enacl.box_keypair @@ -150,9 +148,11 @@ defmodule SNet.TCPConn do |> Map.put(:peer_info, {:inet, addr, port}) |> Map.put(:my_port, state.my_port) - GenServer.cast(Shard.Manager, {:peer_up, self(), state.peer_info, state.auth}) - Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" + if GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) == :redundant do + exit :redundant + end + Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" {:noreply, state} end @@ -262,9 +262,11 @@ defmodule SNet.TCPConn do |> Map.put(:peer_info, {:inet, addr, his_port}) |> Map.put(:my_port, state.my_port) - GenServer.cast(Shard.Manager, {:peer_up, self(), state.peer_info, state.auth}) - Logger.info "New peer: #{print_id state} at #{inspect state.peer_info} (#{port})" + if GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) == :redundant do + exit :redundant + end + Logger.info "New peer: #{print_id state} at #{inspect state.peer_info} (#{port})" {:noreply, state} end @@ -284,7 +286,6 @@ defmodule SNet.TCPConn do def handle_info({:tcp_closed, _socket}, state) do Logger.info "Disconnected: #{print_id state} at #{inspect state.peer_info}" - GenServer.cast(Shard.Manager, {:peer_down, self(), state.peer_info, state.auth}) exit(:normal) end diff --git a/shard/lib/net/tcpserver.ex b/shard/lib/net/tcpserver.ex index 6cc3473..d7326ad 100644 --- a/shard/lib/net/tcpserver.ex +++ b/shard/lib/net/tcpserver.ex @@ -19,8 +19,7 @@ defmodule SNet.TCPServer do defp loop_acceptor(socket, my_port) do {:ok, client} = :gen_tcp.accept(socket) - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, - {SNet.TCPConn, %{socket: client, my_port: my_port, is_client: false}}) + pid = GenServer.call(SNet.Manager, {:accept, client}) :ok = :gen_tcp.controlling_process(client, pid) loop_acceptor(socket, my_port) end |