From 7b6042205e7c6135fae4e0d21dbf7a5975e8491b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Oct 2018 11:02:35 +0200 Subject: Clean up --- shard/lib/application.ex | 15 ++++++------ shard/lib/cli/cli.ex | 2 +- shard/lib/keys.ex | 3 +++ shard/lib/manager.ex | 59 +++++++++++++++++----------------------------- shard/lib/net/tcpconn.ex | 4 ++-- shard/lib/net/tcpserver.ex | 7 +++--- 6 files changed, 38 insertions(+), 52 deletions(-) (limited to 'shard/lib') diff --git a/shard/lib/application.ex b/shard/lib/application.ex index 6e2c7d5..0b61cc0 100644 --- a/shard/lib/application.ex +++ b/shard/lib/application.ex @@ -11,20 +11,19 @@ defmodule Shard.Application do def start(_type, _args) do import Supervisor.Spec, warn: false - {listen_port, _} = Integer.parse ((System.get_env "PORT") || "4044") - # Define workers and child supervisors to be supervised children = [ + { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor }, + + # Applications & data store + Shard.Manager, + + # Keys & identities Shard.Keys, { Task, fn -> Shard.Keys.get_any_identity end }, - - { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor }, # Networking - { SNet.TCPServer, listen_port }, - - # Applications & data store - { Shard.Manager, listen_port }, + SNet.TCPServer, ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex index 2e35e45..5f3dc7f 100644 --- a/shard/lib/cli/cli.ex +++ b/shard/lib/cli/cli.ex @@ -72,7 +72,7 @@ defmodule SCLI do defp handle_command(state, ["connect", ipstr, portstr]) do {:ok, ip} = :inet.parse_address (to_charlist ipstr) {port, _} = Integer.parse portstr - Shard.Manager.add_peer(ip, port) + Shard.Manager.add_peer({:inet, ip, port}) state end diff --git a/shard/lib/keys.ex b/shard/lib/keys.ex index b810078..f98deba 100644 --- a/shard/lib/keys.ex +++ b/shard/lib/keys.ex @@ -15,6 +15,9 @@ defmodule Shard.Keys do def init() do :dets.start {:ok, @key_db} = :dets.open_file(@key_db, [type: :set]) + for [pk, _] <- :dets.match(@key_db, {:'$1', :'$2'}) do + Shard.Manifest.start %SApp.Identity.Manifest{pk: pk} + end nil end diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index 3f5416d..08e14c7 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -34,7 +34,7 @@ defmodule Shard.Manager do Mult-list of { shard_id, peer_info } # TODO: add health info (last seen, ping, etc) - peer_info := {:inet4, ip, port} | {:inet6, ip, port} | {:onion, name} + peer_info := {:inet, ip, port} | {:inet6, ip, port} | {:onion, name} - :shard_procs (not persistent) @@ -67,11 +67,11 @@ defmodule Shard.Manager do @shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom - def start_link(my_port) do - GenServer.start_link(__MODULE__, my_port, name: __MODULE__) + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) end - def init(my_port) do + def init(_) do :dets.open_file(@shard_db, [type: :set]) for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do :dets.insert @shard_db, {id, manifest, nil} @@ -85,7 +85,7 @@ defmodule Shard.Manager do :ets.new(:connections, [:bag, :protected, :named_table]) outbox = :ets.new(:outbox, [:bag, :private]) - {:ok, %{my_port: my_port, outbox: outbox} } + {:ok, %{outbox: outbox} } end def handle_call({:register, shard_id, manifest, pid}, _from, state) do @@ -153,9 +153,11 @@ defmodule Shard.Manager do end def handle_cast({:connect_and_send, peer_info, msg}, state) do - case peer_info do - {:tcp4, ip, port} -> - add_peer(ip, port, state) + case :ets.lookup(:connections, peer_info) do + [{_, pid, _}|_] -> + GenServer.cast(pid, {:send_msg, msg}) + [] -> + add_peer(peer_info) currtime = System.os_time :second :ets.insert(state.outbox, {peer_info, msg, currtime}) outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ] @@ -164,39 +166,11 @@ defmodule Shard.Manager do {:noreply, state} end - def handle_cast({:try_connect, pk_list}, state) do - for pk <- pk_list do - case :dets.lookup(@peer_db, pk) do - [{^pk, nil, ip, port}] -> - add_peer(ip, port, state) - _ -> nil - end - end - {:noreply, state} - end - - def handle_cast({:add_peer, ip, port}, state) do - add_peer(ip, port, state) - {:noreply, state} - end - def handle_info({:DOWN, _, :process, pid, _}, state) do :ets.match_delete(:shard_procs, {:_, pid}) {:noreply, state} end - defp add_peer(ip, port, state) do - spawn fn -> - case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do - {:ok, client} -> - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port, is_client: true, auth: nil}}) - :ok = :gen_tcp.controlling_process(client, pid) - _ -> - Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" - end - end - end - # ====================== # CALLED BY SNet.TcpConn @@ -305,8 +279,17 @@ defmodule Shard.Manager do @doc""" Connect to a peer specified by ip address and port """ - def add_peer(ip, port) do - GenServer.cast(__MODULE__, {:add_peer, ip, port}) + def add_peer({:inet, ip, port}) do + spawn fn -> + case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do + {:ok, client} -> + my_port = Application.get_env(:shard, :port) + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port, is_client: true, auth: nil}}) + :ok = :gen_tcp.controlling_process(client, pid) + _ -> + Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" + end + end end @doc""" diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex index 5dbf42b..67d7f4c 100644 --- a/shard/lib/net/tcpconn.ex +++ b/shard/lib/net/tcpconn.ex @@ -147,7 +147,7 @@ defmodule SNet.TCPConn do {:ok, {addr, port}} = :inet.peername socket state = stream_param |> Map.put(:socket, socket) - |> Map.put(:peer_info, {:tcp4, addr, port}) + |> Map.put(:peer_info, {:inet, addr, port}) |> Map.put(:my_port, state.my_port) GenServer.cast(Shard.Manager, {:peer_up, self(), state.peer_info, state.auth}) @@ -259,7 +259,7 @@ defmodule SNet.TCPConn do {:ok, {addr, port}} = :inet.peername socket state = stream_param |> Map.put(:socket, socket) - |> Map.put(:peer_info, {:tcp4, addr, his_port}) + |> Map.put(:peer_info, {:inet, addr, his_port}) |> Map.put(:my_port, state.my_port) GenServer.cast(Shard.Manager, {:peer_up, self(), state.peer_info, state.auth}) diff --git a/shard/lib/net/tcpserver.ex b/shard/lib/net/tcpserver.ex index 1aa5738..6cc3473 100644 --- a/shard/lib/net/tcpserver.ex +++ b/shard/lib/net/tcpserver.ex @@ -2,14 +2,15 @@ defmodule SNet.TCPServer do require Logger use Task, restart: :permanent - def start_link(port) do - Task.start_link(__MODULE__, :accept, [port]) + def start_link(_) do + Task.start_link(__MODULE__, :accept, []) end @doc """ Starts accepting connections on the given `port`. """ - def accept(port) do + def accept() do + port = Application.get_env(:shard, :port) {:ok, socket} = :gen_tcp.listen(port, [:binary, packet: 2, active: false, reuseaddr: true]) Logger.info "Accepting connections on port #{port}" -- cgit v1.2.3