aboutsummaryrefslogtreecommitdiff
path: root/shard
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-10-11 15:11:52 +0200
committerAlex Auvolat <alex@adnab.me>2018-10-11 15:11:52 +0200
commit1646bc57eae9880fd408d23ca692364dc6fd6442 (patch)
tree0f65d47498f33772152b20ee082d34e218f16df4 /shard
parenteb8c949551ffb8b3600357d7ff2bebe750af96e5 (diff)
downloadshard-1646bc57eae9880fd408d23ca692364dc6fd6442.tar.gz
shard-1646bc57eae9880fd408d23ca692364dc6fd6442.zip
Move somme functionnality to SNet.Manager
Diffstat (limited to 'shard')
-rw-r--r--shard/lib/app/chat.ex6
-rw-r--r--shard/lib/app/identity.ex6
-rw-r--r--shard/lib/app/pagestore.ex8
-rw-r--r--shard/lib/application.ex7
-rw-r--r--shard/lib/cli/cli.ex2
-rw-r--r--shard/lib/manager.ex103
-rw-r--r--shard/lib/net/group.ex17
-rw-r--r--shard/lib/net/manager.ex119
-rw-r--r--shard/lib/net/tcpconn.ex25
-rw-r--r--shard/lib/net/tcpserver.ex3
10 files changed, 158 insertions, 138 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index 051dfef..53767ef 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -118,7 +118,7 @@ defmodule SApp.Chat do
end
end
- notif = {:append, prev_root, msgitem, mst.root}
+ notif = {state.id, nil, {:append, prev_root, msgitem, mst.root}}
SNet.Group.broadcast(state.group, notif)
{:noreply, state}
@@ -130,7 +130,7 @@ defmodule SApp.Chat do
"""
def handle_cast({:interested, conn_pid, auth}, state) do
if SNet.Group.in_group?(state.group, conn_pid, auth) do
- Shard.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}})
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}})
end
{:noreply, state}
end
@@ -157,7 +157,7 @@ defmodule SApp.Chat do
else
state = case msg do
{:get_manifest} ->
- Shard.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}})
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}})
state
{:append, prev_root, msgitem, new_root} ->
# Append message: one single mesage has arrived
diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex
index d2748a1..391d37e 100644
--- a/shard/lib/app/identity.ex
+++ b/shard/lib/app/identity.ex
@@ -84,14 +84,14 @@ defmodule SApp.Identity do
end
def handle_cast(:init_pull, state) do
- for {_, pid, _} <- Shard.Manager.list_connections do
+ for {_, pid, _} <- SNet.Manager.list_connections do
GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
end
{:noreply, state}
end
def handle_cast({:interested, peer_pid, _auth}, state) do
- Shard.Manager.send_pid(peer_pid, {state.id, nil, {:update, SData.SignRev.signed(state.state)}})
+ SNet.Manager.send_pid(peer_pid, {state.id, nil, {:update, SData.SignRev.signed(state.state)}})
{:noreply, state}
end
@@ -115,7 +115,7 @@ defmodule SApp.Identity do
def bcast_state(state, exclude \\ []) do
for peer_id <- Shard.Manager.get_shard_peers(state.id) do
if not Enum.member? exclude, peer_id do
- Shard.Manager.send(peer_id, {state.id, nil, {:update, SData.SignRev.signed(state.state)}})
+ SNet.Manager.send(peer_id, {state.id, nil, {:update, SData.SignRev.signed(state.state)}})
end
end
end
diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex
index e09c513..ad18eac 100644
--- a/shard/lib/app/pagestore.ex
+++ b/shard/lib/app/pagestore.ex
@@ -107,7 +107,7 @@ defmodule SApp.PageStore do
case prefer_ask do
[_|_] ->
for peer <- prefer_ask do
- Shard.Manager.send_pid(peer, {state.shard_id, state.path, {:get, key}})
+ SNet.Manager.send_pid(peer, {state.shard_id, state.path, {:get, key}})
end
_ ->
ask_random_peers(state, key)
@@ -128,9 +128,9 @@ defmodule SApp.PageStore do
{:get, key} ->
case :dets.lookup state.store, key do
[{_, _, bin}] ->
- Shard.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}})
+ SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}})
_ ->
- Shard.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}})
+ SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}})
end
state
{:info, hash, bin} ->
@@ -251,7 +251,7 @@ defmodule SApp.PageStore do
|> Enum.shuffle
|> Enum.take(3)
for peer <- peers do
- Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ SNet.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
end
end
diff --git a/shard/lib/application.ex b/shard/lib/application.ex
index 0b61cc0..0daf48e 100644
--- a/shard/lib/application.ex
+++ b/shard/lib/application.ex
@@ -17,13 +17,14 @@ defmodule Shard.Application do
# Applications & data store
Shard.Manager,
+
+ # Networking
+ SNet.Manager,
+ SNet.TCPServer,
# Keys & identities
Shard.Keys,
{ Task, fn -> Shard.Keys.get_any_identity end },
-
- # Networking
- SNet.TCPServer,
]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex
index 5f3dc7f..f7e8525 100644
--- a/shard/lib/cli/cli.ex
+++ b/shard/lib/cli/cli.ex
@@ -72,7 +72,7 @@ defmodule SCLI do
defp handle_command(state, ["connect", ipstr, portstr]) do
{:ok, ip} = :inet.parse_address (to_charlist ipstr)
{port, _} = Integer.parse portstr
- Shard.Manager.add_peer({:inet, ip, port})
+ SNet.Manager.add_peer({:inet, ip, port})
state
end
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index 1e089ba..9def229 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -40,23 +40,6 @@ defmodule Shard.Manager do
List of
{ {id, path}, pid }
-
- - :connections (not persistent)
-
- List of
- { peer_info, pid, nil | {my_pk, his_pk} }
-
- And an internal table :
-
- - :outbox (not persistent)
-
- Multi-list of
- { dest_peer_info, message, time_inserted }
-
- dest := peer_info
-
- No support for messages on authenticated channels
-
"""
use GenServer
@@ -82,10 +65,8 @@ defmodule Shard.Manager do
:dets.open_file(@peer_db, [type: :bag])
:ets.new(:shard_procs, [:set, :protected, :named_table])
- :ets.new(:connections, [:bag, :protected, :named_table])
- outbox = :ets.new(:outbox, [:bag, :private])
- {:ok, %{outbox: outbox} }
+ {:ok, nil}
end
def handle_call({:register, shard_id, manifest, pid}, _from, state) do
@@ -131,41 +112,6 @@ defmodule Shard.Manager do
{:noreply, state}
end
- def handle_cast({:peer_up, pid, peer_info, auth}, state) do
- :ets.insert(:connections, {peer_info, pid, auth})
-
- # Send interested message for all our shards
- id_list = (for [{id, _, _}] <- :dets.match(@shard_db, :"$1"), do: id)
- GenServer.cast(pid, {:send_msg, {:interested, id_list}})
-
- # Send queued messages
- for {_, msg, _} <- :ets.lookup(state.outbox, peer_info) do
- GenServer.cast(pid, {:send_msg, msg})
- end
- :ets.delete(state.outbox, peer_info)
-
- {:noreply, state}
- end
-
- def handle_cast({:peer_down, peer_pid, peer_info, auth}, state) do
- :ets.match_delete(:connections, {peer_info, peer_pid, auth})
- {:noreply, state}
- end
-
- def handle_cast({:connect_and_send, peer_info, msg}, state) do
- case :ets.lookup(:connections, peer_info) do
- [{_, pid, _}|_] ->
- GenServer.cast(pid, {:send_msg, msg})
- [] ->
- add_peer(peer_info)
- currtime = System.os_time :second
- :ets.insert(state.outbox, {peer_info, msg, currtime})
- outbox_cleanup = [ {{:_, :_, :'$1'}, [{:<, :'$1', currtime - 60}], [true]} ]
- :ets.select_delete(state.outbox, outbox_cleanup)
- end
- {:noreply, state}
- end
-
def handle_info({:DOWN, _, :process, pid, _}, state) do
:ets.match_delete(:shard_procs, {:_, pid})
{:noreply, state}
@@ -212,26 +158,6 @@ defmodule Shard.Manager do
# ================
@doc"""
- Send message to a peer specified by peer id
- """
- def send_pid(pid, msg) do
- GenServer.cast(pid, {:send_msg, msg})
- end
-
- @doc"""
- Send message to a peer specified by peer info.
- Opens a connection if necessary.
- """
- def send(peer_info, msg) do
- case :ets.lookup(:connections, peer_info) do
- [{^peer_info, pid, _auth}|_] ->
- GenServer.cast(pid, {:send_msg, msg})
- [] ->
- GenServer.cast(__MODULE__, {:connect_and_send, peer_info, msg})
- end
- end
-
- @doc"""
Register a process as the main process for a shard.
Returns either :ok or :redundant, in which case the process must exit.
@@ -277,22 +203,6 @@ defmodule Shard.Manager do
# ================
@doc"""
- Connect to a peer specified by ip address and port
- """
- def add_peer({:inet, ip, port}) do
- spawn fn ->
- case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
- {:ok, client} ->
- my_port = Application.get_env(:shard, :port)
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port, is_client: true, auth: nil}})
- :ok = :gen_tcp.controlling_process(client, pid)
- _ ->
- Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
- end
- end
- end
-
- @doc"""
Returns the pid for a shard if it exists
"""
def find_proc(shard_id) do
@@ -308,15 +218,4 @@ defmodule Shard.Manager do
def list_shards() do
for [x] <- :dets.match(@shard_db, :"$1"), do: x
end
-
- @doc"""
- Return the list of all connected peers
- """
- def list_connections() do
- for [x] <- :ets.match(:connections, :"$1"), do: x
- end
-
- def get_connections_to(peer_info) do
- for {^peer_info, pid, auth} <- :ets.lookup(:connections, peer_info), do: {pid, auth}
- end
end
diff --git a/shard/lib/net/group.ex b/shard/lib/net/group.ex
index d2c2537..692438a 100644
--- a/shard/lib/net/group.ex
+++ b/shard/lib/net/group.ex
@@ -36,12 +36,12 @@ defmodule SNet.PubShardGroup do
def init_lookup(%SNet.PubShardGroup{id: id}, _notify_to) do
# For now: ask all currently connected peers and connect to new peers we know of
spawn fn ->
- for {_, pid, _} <- Shard.Manager.list_connections do
+ for {_, pid, _} <- SNet.Manager.list_connections do
GenServer.cast(pid, {:send_msg, {:interested, [id]}})
end
for peer_info <- Shard.Manager.get_shard_peers id do
- if Shard.Manager.get_connections_to peer_info == [] do
- Shard.Manager.add_peer(peer_info) # TODO callback when connected
+ if SNet.Manager.get_connections_to peer_info == [] do
+ SNet.Manager.add_peer(peer_info) # TODO callback when connected
end
end
end
@@ -49,9 +49,10 @@ defmodule SNet.PubShardGroup do
end
def get_connections(%SNet.PubShardGroup{id: id}) do
- for peer_info <- Shard.Manager.get_shard_peers(id),
- [{pid, _auth}|_] = Shard.Manager.get_connections_to(peer_info),
- do: pid
+ Shard.Manager.get_shard_peers(id)
+ |> Enum.map(&(SNet.Manager.get_connections_to(&1)))
+ |> Enum.filter(&(&1 != []))
+ |> Enum.map(fn [{pid, _auth}|_] -> pid end)
end
def broadcast(group, msg, nmax) do
@@ -63,10 +64,10 @@ defmodule SNet.PubShardGroup do
|> Enum.count
if nmax - nsent > 0 do
Shard.Manager.get_shard_peers(id)
- |> Enum.filter(&(Shard.Manager.get_connections_to(&1) == []))
+ |> Enum.filter(&(SNet.Manager.get_connections_to(&1) == []))
|> Enum.shuffle
|> Enum.take(nmax - nsent)
- |> Enum.map(&(Shard.Manager.send(&1, msg)))
+ |> Enum.map(&(SNet.Manager.send(&1, msg)))
end
end
diff --git a/shard/lib/net/manager.ex b/shard/lib/net/manager.ex
new file mode 100644
index 0000000..17d6e06
--- /dev/null
+++ b/shard/lib/net/manager.ex
@@ -0,0 +1,119 @@
+defmodule SNet.Manager do
+ @moduledoc"""
+ - :connections (not persistent)
+
+ List of
+ { peer_info, pid, nil | {my_pk, his_pk} }
+ """
+
+ use GenServer
+
+ require Logger
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, nil, name: __MODULE__)
+ end
+
+ def init(_) do
+ Process.flag(:trap_exit, true)
+
+ :ets.new(:connections, [:bag, :protected, :named_table])
+
+ {:ok, nil}
+ end
+
+ def handle_call({:add_peer, peer_info}, _from, state) do
+ pid = add_peer_internal(peer_info)
+ {:reply, pid, state}
+ end
+
+ def handle_call({:accept, client}, _from, state) do
+ my_port = Application.get_env(:shard, :port)
+ {:ok, pid} = SNet.TCPConn.start_link(%{socket: client, my_port: my_port})
+ {:reply, pid, state}
+ end
+
+ def handle_call({:peer_up, pid, peer_info, auth}, _from, state) do
+ case :ets.match(:connections, {peer_info, :_, auth}) do
+ [{_, pid2, _}] when pid2 != pid ->
+ {:reply, :redundant, state}
+ _ ->
+ :ets.insert(:connections, {peer_info, pid, auth})
+
+ # Send interested message for all our shards
+ id_list = (for {id, _, _} <- Shard.Manager.list_shards(), do: id)
+ GenServer.cast(pid, {:send_msg, {:interested, id_list}})
+
+ {:reply, :ok, state}
+ end
+ end
+
+ def handle_cast({:connect_and_send, peer_info, msg}, state) do
+ pid = add_peer_internal(peer_info)
+ GenServer.cast(pid, {:send_msg, msg})
+ {:noreply, state}
+ end
+
+ def handle_info({:EXIT, pid, _reason}, state) do
+ :ets.match_delete(:connections, {:_, pid, :_})
+ {:noreply, state}
+ end
+
+ defp add_peer_internal(peer_info) do
+ case :ets.lookup(:connections, peer_info) do
+ [{_, pid, _}|_] ->
+ pid
+ [] ->
+ my_port = Application.get_env(:shard, :port)
+ {:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: nil})
+ :ets.insert(:connections, {peer_info, pid, nil})
+ pid
+ end
+ end
+
+ # =========
+ # INTERFACE
+ # =========
+
+ @doc"""
+ Connect to a peer specified by ip address and port
+ """
+ def add_peer(peer_info) do
+ GenServer.call(__MODULE__, {:add_peer, peer_info})
+ end
+
+ @doc"""
+ Return the list of all connected peers
+ """
+ def list_connections() do
+ for [x] <- :ets.match(:connections, :"$1"), do: x
+ end
+
+ @doc"""
+ Return the list of connections to a given peer, possibly with different auth
+ """
+ def get_connections_to(peer_info) do
+ for {^peer_info, pid, auth} <- :ets.lookup(:connections, peer_info), do: {pid, auth}
+ end
+
+ @doc"""
+ Send message to a peer specified by peer info.
+ Opens a connection if necessary.
+ """
+ def send(peer_info, msg) do
+ case :ets.lookup(:connections, peer_info) do
+ [{^peer_info, pid, _auth}|_] ->
+ GenServer.cast(pid, {:send_msg, msg})
+ [] ->
+ GenServer.cast(__MODULE__, {:connect_and_send, peer_info, msg})
+ end
+ end
+
+ @doc"""
+ Send message to a peer specified by peer id
+ """
+ def send_pid(pid, msg) do
+ GenServer.cast(pid, {:send_msg, msg})
+ end
+end
+
diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex
index 67d7f4c..476c426 100644
--- a/shard/lib/net/tcpconn.ex
+++ b/shard/lib/net/tcpconn.ex
@@ -33,23 +33,21 @@ defmodule SNet.TCPConn do
end
def init(state) do
- if state.is_client do
- GenServer.cast(self(), :client_handshake)
- else
+ if Map.has_key?(state, :socket) do
GenServer.cast(self(), :server_handshake)
+ else
+ GenServer.cast(self(), :client_handshake)
end
-
{:ok, state}
end
-
def handle_call(:get_peer_info, _from, state) do
{:reply, state.peer_info, state}
end
-
def handle_cast(:client_handshake, state) do
- socket = state.socket
+ {:inet, ip, port} = state.connect_to
+ {:ok, socket} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
net_key = Application.get_env(:shard, :network_key)
%{public: cli_eph_pk, secret: cli_eph_sk} = :enacl.box_keypair
@@ -150,9 +148,11 @@ defmodule SNet.TCPConn do
|> Map.put(:peer_info, {:inet, addr, port})
|> Map.put(:my_port, state.my_port)
- GenServer.cast(Shard.Manager, {:peer_up, self(), state.peer_info, state.auth})
- Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
+ if GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) == :redundant do
+ exit :redundant
+ end
+ Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
{:noreply, state}
end
@@ -262,9 +262,11 @@ defmodule SNet.TCPConn do
|> Map.put(:peer_info, {:inet, addr, his_port})
|> Map.put(:my_port, state.my_port)
- GenServer.cast(Shard.Manager, {:peer_up, self(), state.peer_info, state.auth})
- Logger.info "New peer: #{print_id state} at #{inspect state.peer_info} (#{port})"
+ if GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) == :redundant do
+ exit :redundant
+ end
+ Logger.info "New peer: #{print_id state} at #{inspect state.peer_info} (#{port})"
{:noreply, state}
end
@@ -284,7 +286,6 @@ defmodule SNet.TCPConn do
def handle_info({:tcp_closed, _socket}, state) do
Logger.info "Disconnected: #{print_id state} at #{inspect state.peer_info}"
- GenServer.cast(Shard.Manager, {:peer_down, self(), state.peer_info, state.auth})
exit(:normal)
end
diff --git a/shard/lib/net/tcpserver.ex b/shard/lib/net/tcpserver.ex
index 6cc3473..d7326ad 100644
--- a/shard/lib/net/tcpserver.ex
+++ b/shard/lib/net/tcpserver.ex
@@ -19,8 +19,7 @@ defmodule SNet.TCPServer do
defp loop_acceptor(socket, my_port) do
{:ok, client} = :gen_tcp.accept(socket)
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor,
- {SNet.TCPConn, %{socket: client, my_port: my_port, is_client: false}})
+ pid = GenServer.call(SNet.Manager, {:accept, client})
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket, my_port)
end