aboutsummaryrefslogtreecommitdiff
path: root/shard/lib
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-10-11 11:02:35 +0200
committerAlex Auvolat <alex@adnab.me>2018-10-11 11:02:35 +0200
commit7b6042205e7c6135fae4e0d21dbf7a5975e8491b (patch)
tree58fc563458702fb456729b79802eacb547a8fc66 /shard/lib
parenta2f678dc510e642479d61b81148a433edd7b76fe (diff)
downloadshard-7b6042205e7c6135fae4e0d21dbf7a5975e8491b.tar.gz
shard-7b6042205e7c6135fae4e0d21dbf7a5975e8491b.zip
Clean up
Diffstat (limited to 'shard/lib')
-rw-r--r--shard/lib/application.ex15
-rw-r--r--shard/lib/cli/cli.ex2
-rw-r--r--shard/lib/keys.ex3
-rw-r--r--shard/lib/manager.ex59
-rw-r--r--shard/lib/net/tcpconn.ex4
-rw-r--r--shard/lib/net/tcpserver.ex7
6 files changed, 38 insertions, 52 deletions
diff --git a/shard/lib/application.ex b/shard/lib/application.ex
index 6e2c7d5..0b61cc0 100644
--- a/shard/lib/application.ex
+++ b/shard/lib/application.ex
@@ -11,20 +11,19 @@ defmodule Shard.Application do
def start(_type, _args) do
import Supervisor.Spec, warn: false
- {listen_port, _} = Integer.parse ((System.get_env "PORT") || "4044")
-
# Define workers and child supervisors to be supervised
children = [
+ { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor },
+
+ # Applications & data store
+ Shard.Manager,
+
+ # Keys & identities
Shard.Keys,
{ Task, fn -> Shard.Keys.get_any_identity end },
-
- { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor },
# Networking
- { SNet.TCPServer, listen_port },
-
- # Applications & data store
- { Shard.Manager, listen_port },
+ SNet.TCPServer,
]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex
index 2e35e45..5f3dc7f 100644
--- a/shard/lib/cli/cli.ex
+++ b/shard/lib/cli/cli.ex
@@ -72,7 +72,7 @@ defmodule SCLI do
defp handle_command(state, ["connect", ipstr, portstr]) do
{:ok, ip} = :inet.parse_address (to_charlist ipstr)
{port, _} = Integer.parse portstr
- Shard.Manager.add_peer(ip, port)
+ Shard.Manager.add_peer({:inet, ip, port})
state
end
diff --git a/shard/lib/keys.ex b/shard/lib/keys.ex
index b810078..f98deba 100644
--- a/shard/lib/keys.ex
+++ b/shard/lib/keys.ex
@@ -15,6 +15,9 @@ defmodule Shard.Keys do
def init() do
:dets.start
{:ok, @key_db} = :dets.open_file(@key_db, [type: :set])
+ for [pk, _] <- :dets.match(@key_db, {:'$1', :'$2'}) do
+ Shard.Manifest.start %SApp.Identity.Manifest{pk: pk}
+ end
nil
end
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index 3f5416d..08e14c7 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -34,7 +34,7 @@ defmodule Shard.Manager do
Mult-list of
{ shard_id, peer_info } # TODO: add health info (last seen, ping, etc)
- peer_info := {:inet4, ip, port} | {:inet6, ip, port} | {:onion, name}
+ peer_info := {:inet, ip, port} | {:inet6, ip, port} | {:onion, name}
- :shard_procs (not persistent)
@@ -67,11 +67,11 @@ defmodule Shard.Manager do
@shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom
@peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
- def start_link(my_port) do
- GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
- def init(my_port) do
+ def init(_) do
:dets.open_file(@shard_db, [type: :set])
for [{id, manifest, _pid}] <- :dets.match @shard_db, :"$1" do
:dets.insert @shard_db, {id, manifest, nil}
@@ -85,7 +85,7 @@ defmodule Shard.Manager do
:ets.new(:connections, [:bag, :protected, :named_table])
outbox = :ets.new(:outbox, [:bag, :private])
- {:ok, %{my_port: my_port, outbox: outbox} }
+ {:ok, %{outbox: outbox} }
end
def handle_call({:register, shard_id, manifest, pid}, _from, state) do
@@ -153,9 +153,11 @@ defmodule Shard.Manager do
end
def handle_cast({:connect_and_send, peer_info, msg}, state) do
- case peer_info do
- {:tcp4, ip, port} ->
- add_peer(ip, port, state)
+ case :ets.lookup(:connections, peer_info) do
+ [{_, pid, _}|_] ->
+ GenServer.cast(pid, {:send_msg, msg})
+ [] ->
+ add_peer(peer_info)
currtime = System.os_time :second
:ets.insert(state.outbox, {peer_info, msg, currtime})
outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ]
@@ -164,39 +166,11 @@ defmodule Shard.Manager do
{:noreply, state}
end
- def handle_cast({:try_connect, pk_list}, state) do
- for pk <- pk_list do
- case :dets.lookup(@peer_db, pk) do
- [{^pk, nil, ip, port}] ->
- add_peer(ip, port, state)
- _ -> nil
- end
- end
- {:noreply, state}
- end
-
- def handle_cast({:add_peer, ip, port}, state) do
- add_peer(ip, port, state)
- {:noreply, state}
- end
-
def handle_info({:DOWN, _, :process, pid, _}, state) do
:ets.match_delete(:shard_procs, {:_, pid})
{:noreply, state}
end
- defp add_peer(ip, port, state) do
- spawn fn ->
- case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
- {:ok, client} ->
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port, is_client: true, auth: nil}})
- :ok = :gen_tcp.controlling_process(client, pid)
- _ ->
- Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
- end
- end
- end
-
# ======================
# CALLED BY SNet.TcpConn
@@ -305,8 +279,17 @@ defmodule Shard.Manager do
@doc"""
Connect to a peer specified by ip address and port
"""
- def add_peer(ip, port) do
- GenServer.cast(__MODULE__, {:add_peer, ip, port})
+ def add_peer({:inet, ip, port}) do
+ spawn fn ->
+ case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
+ {:ok, client} ->
+ my_port = Application.get_env(:shard, :port)
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port, is_client: true, auth: nil}})
+ :ok = :gen_tcp.controlling_process(client, pid)
+ _ ->
+ Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
+ end
+ end
end
@doc"""
diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex
index 5dbf42b..67d7f4c 100644
--- a/shard/lib/net/tcpconn.ex
+++ b/shard/lib/net/tcpconn.ex
@@ -147,7 +147,7 @@ defmodule SNet.TCPConn do
{:ok, {addr, port}} = :inet.peername socket
state = stream_param
|> Map.put(:socket, socket)
- |> Map.put(:peer_info, {:tcp4, addr, port})
+ |> Map.put(:peer_info, {:inet, addr, port})
|> Map.put(:my_port, state.my_port)
GenServer.cast(Shard.Manager, {:peer_up, self(), state.peer_info, state.auth})
@@ -259,7 +259,7 @@ defmodule SNet.TCPConn do
{:ok, {addr, port}} = :inet.peername socket
state = stream_param
|> Map.put(:socket, socket)
- |> Map.put(:peer_info, {:tcp4, addr, his_port})
+ |> Map.put(:peer_info, {:inet, addr, his_port})
|> Map.put(:my_port, state.my_port)
GenServer.cast(Shard.Manager, {:peer_up, self(), state.peer_info, state.auth})
diff --git a/shard/lib/net/tcpserver.ex b/shard/lib/net/tcpserver.ex
index 1aa5738..6cc3473 100644
--- a/shard/lib/net/tcpserver.ex
+++ b/shard/lib/net/tcpserver.ex
@@ -2,14 +2,15 @@ defmodule SNet.TCPServer do
require Logger
use Task, restart: :permanent
- def start_link(port) do
- Task.start_link(__MODULE__, :accept, [port])
+ def start_link(_) do
+ Task.start_link(__MODULE__, :accept, [])
end
@doc """
Starts accepting connections on the given `port`.
"""
- def accept(port) do
+ def accept() do
+ port = Application.get_env(:shard, :port)
{:ok, socket} = :gen_tcp.listen(port,
[:binary, packet: 2, active: false, reuseaddr: true])
Logger.info "Accepting connections on port #{port}"