aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/net
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/net')
-rw-r--r--shard/lib/net/addr.ex4
-rw-r--r--shard/lib/net/group.ex60
-rw-r--r--shard/lib/net/manager.ex58
-rw-r--r--shard/lib/net/tcpconn.ex42
4 files changed, 126 insertions, 38 deletions
diff --git a/shard/lib/net/addr.ex b/shard/lib/net/addr.ex
index 645e109..630f95a 100644
--- a/shard/lib/net/addr.ex
+++ b/shard/lib/net/addr.ex
@@ -24,4 +24,8 @@ defmodule SNet.Addr do
addrset = MapSet.put(addrset, get_pub_inet4())
MapSet.to_list addrset
end
+
+ def is_local?({:inet, ip, port}) do
+ port == Application.get_env(:shard, :port) and (ip == {127,0,0,1} or ip in get_if_inet4())
+ end
end
diff --git a/shard/lib/net/group.ex b/shard/lib/net/group.ex
index 692438a..f09d174 100644
--- a/shard/lib/net/group.ex
+++ b/shard/lib/net/group.ex
@@ -81,19 +81,67 @@ defmodule SNet.PrivGroup do
defstruct [:pk_list]
defimpl SNet.Group do
- def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do
- # TODO
+ def init_lookup(%SNet.PrivGroup{pk_list: pk_list}, notify_to) do
+ spawn fn ->
+ # 1. We might already have some connections to these guys
+ for {_, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}} <- SNet.Manager.list_connections do
+ if (my_pk in pk_list) and (his_pk in pk_list) do
+ GenServer.cast(notify_to, {:peer_connected, pid})
+ end
+ end
+ # 2. We might also want to open some new connections to these guys
+ [my_pk|_] = Enum.filter(pk_list, &Shard.Keys.have_sk?/1)
+ for pk <- pk_list do
+ pid = SApp.Identity.find_proc(pk)
+ info = GenServer.call(pid, :get_info)
+ if Map.has_key?(info, :peer_info) do
+ for pi <- info.peer_info do
+ SNet.Manager.add_peer(pi, %SNet.Auth{my_pk: my_pk, his_pk: pk})
+ # no callback here, we don't know if connect was successful
+ end
+ end
+ end
+ end
end
- def get_connections(%SNet.PubShardGroup{id: id}) do
- # TODO
+ def get_connections(%SNet.PrivGroup{pk_list: pk_list}) do
+ for {_, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}} <- SNet.Manager.list_connections,
+ (my_pk in pk_list) and (his_pk in pk_list),
+ do: pid
end
def broadcast(group, msg, nmax) do
+ %SNet.PrivGroup{pk_list: pk_list} = group
+ nsent = get_connections(group)
+ |> Enum.shuffle
+ |> Enum.take(nmax)
+ |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg})))
+ |> Enum.count
+ if nmax - nsent > 0 do
+ my_pks = Enum.filter(pk_list, &Shard.Keys.have_sk?/1)
+ [my_pk|_] = my_pks
+ candidates = for pk <- pk_list,
+ pid = SApp.Identity.find_proc(pk),
+ info = GenServer.call(pid, :get_info),
+ Map.has_key?(info, :peer_info),
+ xx <- info.peer_info,
+ do: {xx, pk}
+ candidates
+ |> Enum.filter(fn {peer_info, his_pk} ->
+ SNet.Manager.get_auth_connections_to(peer_info, my_pks, his_pk) == [] end)
+ |> Enum.shuffle()
+ |> Enum.take(nmax - nsent)
+ |> Enum.map(fn {peer_info, his_pk} ->
+ SNet.Manager.send_auth(peer_info, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, msg) end)
+ end
end
- def in_group?(%SNet.PubShardGroup{id: _id}, peer_pid, auth) do
- # TODO
+ def in_group?(%SNet.PrivGroup{pk_list: pk_list}, _peer_pid, auth) do
+ case auth do
+ nil -> false
+ %SNet.Auth{my_pk: my_pk, his_pk: his_pk} ->
+ (my_pk in pk_list) and (his_pk in pk_list)
+ end
end
end
end
diff --git a/shard/lib/net/manager.ex b/shard/lib/net/manager.ex
index 17d6e06..75307ee 100644
--- a/shard/lib/net/manager.ex
+++ b/shard/lib/net/manager.ex
@@ -22,8 +22,8 @@ defmodule SNet.Manager do
{:ok, nil}
end
- def handle_call({:add_peer, peer_info}, _from, state) do
- pid = add_peer_internal(peer_info)
+ def handle_call({:add_peer, peer_info, auth}, _from, state) do
+ pid = add_peer_internal(peer_info, auth)
{:reply, pid, state}
end
@@ -34,8 +34,8 @@ defmodule SNet.Manager do
end
def handle_call({:peer_up, pid, peer_info, auth}, _from, state) do
- case :ets.match(:connections, {peer_info, :_, auth}) do
- [{_, pid2, _}] when pid2 != pid ->
+ case :ets.match(:connections, {peer_info, :'$1', auth}) do
+ [[pid2]|_] when pid2 != pid ->
{:reply, :redundant, state}
_ ->
:ets.insert(:connections, {peer_info, pid, auth})
@@ -48,8 +48,8 @@ defmodule SNet.Manager do
end
end
- def handle_cast({:connect_and_send, peer_info, msg}, state) do
- pid = add_peer_internal(peer_info)
+ def handle_cast({:connect_and_send, peer_info, auth, msg}, state) do
+ pid = add_peer_internal(peer_info, auth)
GenServer.cast(pid, {:send_msg, msg})
{:noreply, state}
end
@@ -59,15 +59,18 @@ defmodule SNet.Manager do
{:noreply, state}
end
- defp add_peer_internal(peer_info) do
- case :ets.lookup(:connections, peer_info) do
- [{_, pid, _}|_] ->
- pid
- [] ->
- my_port = Application.get_env(:shard, :port)
- {:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: nil})
- :ets.insert(:connections, {peer_info, pid, nil})
- pid
+ defp add_peer_internal(peer_info, auth) do
+ if SNet.Addr.is_local? peer_info do
+ nil
+ else
+ case :ets.match(:connections, {peer_info, :'$1', (if auth != nil do auth else :_ end)}) do
+ [[pid]|_] -> pid
+ [] ->
+ my_port = Application.get_env(:shard, :port)
+ {:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: auth})
+ :ets.insert(:connections, {peer_info, pid, nil})
+ pid
+ end
end
end
@@ -78,8 +81,8 @@ defmodule SNet.Manager do
@doc"""
Connect to a peer specified by ip address and port
"""
- def add_peer(peer_info) do
- GenServer.call(__MODULE__, {:add_peer, peer_info})
+ def add_peer(peer_info, auth \\ nil) do
+ GenServer.call(__MODULE__, {:add_peer, peer_info, auth})
end
@doc"""
@@ -97,6 +100,16 @@ defmodule SNet.Manager do
end
@doc"""
+ Return the list of connections to a given peer that match a given auth spec
+ """
+ def get_auth_connections_to(peer_info, my_auth, his_auth) do
+ for {^peer_info, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}} <- :ets.lookup(:connections, peer_info),
+ my_pk == my_auth or my_pk in my_auth,
+ his_pk == his_auth or his_pk in his_auth,
+ do: pid
+ end
+
+ @doc"""
Send message to a peer specified by peer info.
Opens a connection if necessary.
"""
@@ -105,7 +118,16 @@ defmodule SNet.Manager do
[{^peer_info, pid, _auth}|_] ->
GenServer.cast(pid, {:send_msg, msg})
[] ->
- GenServer.cast(__MODULE__, {:connect_and_send, peer_info, msg})
+ GenServer.cast(__MODULE__, {:connect_and_send, peer_info, nil, msg})
+ end
+ end
+
+ def send_auth(peer_info, auth, msg) do
+ case :ets.match(:connections, {peer_info, :'$1', auth}) do
+ [[pid]|_] ->
+ GenServer.cast(pid, {:send_msg, msg})
+ [] ->
+ GenServer.cast(__MODULE__, {:connect_and_send, peer_info, auth, msg})
end
end
diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex
index 476c426..25dc839 100644
--- a/shard/lib/net/tcpconn.ex
+++ b/shard/lib/net/tcpconn.ex
@@ -96,8 +96,7 @@ defmodule SNet.TCPConn do
auth: nil,
}
- {cli_longterm_pk, srv_list_pk} ->
- [srv_longterm_pk] = srv_list_pk
+ %SNet.Auth{my_pk: cli_longterm_pk, his_pk: srv_longterm_pk} ->
cli_longterm_sk = Shard.Keys.get_sk cli_longterm_pk
sh_sec_aB = :enacl.curve25519_scalarmult(cli_eph_sk, :enacl.crypto_sign_ed25519_public_to_curve25519(srv_longterm_pk))
@@ -148,12 +147,13 @@ defmodule SNet.TCPConn do
|> Map.put(:peer_info, {:inet, addr, port})
|> Map.put(:my_port, state.my_port)
- if GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) == :redundant do
- exit :redundant
+ case GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) do
+ :ok ->
+ Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
+ {:noreply, state}
+ :redundant ->
+ exit :redundant
end
-
- Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
- {:noreply, state}
end
def handle_cast(:server_handshake, state) do
@@ -209,9 +209,22 @@ defmodule SNet.TCPConn do
_ ->
# Client authenticates
- srv_longterm_pk = state.my_auth # TODO this is not ok
+ srv_longterm_pk = Enum.find(
+ Shard.Keys.list_identities(),
+ fn srv_longterm_pk ->
+ srv_longterm_sk = Shard.Keys.get_sk srv_longterm_pk
+ sh_sec_aB = :enacl.curve25519_scalarmult(:enacl.crypto_sign_ed25519_secret_to_curve25519(srv_longterm_sk), cli_eph_pk)
+ key3 = :crypto.hash(:sha256, net_key <> sh_sec_ab <> sh_sec_aB)
+ case :enacl.secretbox_open(cli_auth, <<0 :: 24*8>>, key3) do
+ {:ok, _cli_auth_plain} -> true
+ _ -> false
+ end
+ end)
+
+ if srv_longterm_pk == nil do
+ exit :bad_auth
+ end
srv_longterm_sk = Shard.Keys.get_sk srv_longterm_pk
-
sh_sec_aB = :enacl.curve25519_scalarmult(:enacl.crypto_sign_ed25519_secret_to_curve25519(srv_longterm_sk), cli_eph_pk)
key3 = :crypto.hash(:sha256, net_key <> sh_sec_ab <> sh_sec_aB)
@@ -262,12 +275,13 @@ defmodule SNet.TCPConn do
|> Map.put(:peer_info, {:inet, addr, his_port})
|> Map.put(:my_port, state.my_port)
- if GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) == :redundant do
- exit :redundant
+ case GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) do
+ :ok ->
+ Logger.info "New peer: #{print_id state} at #{inspect state.peer_info} (#{port})"
+ {:noreply, state}
+ :redundant ->
+ exit(:redundant)
end
-
- Logger.info "New peer: #{print_id state} at #{inspect state.peer_info} (#{port})"
- {:noreply, state}
end
def handle_cast({:send_msg, msg}, state) do