From 28c9cac2e92011e2f7e6f23e93e8a5d9480a0922 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Oct 2018 17:52:25 +0200 Subject: Fix initiation issues --- shard/lib/app/chat.ex | 13 ++++++++----- shard/lib/app/identity.ex | 17 +++++++++++++---- shard/lib/net/group.ex | 16 +++++++++++----- shard/lib/net/manager.ex | 10 +++++++--- shard/lib/net/tcpconn.ex | 5 +++++ 5 files changed, 44 insertions(+), 17 deletions(-) diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 61403b8..f874e86 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -154,8 +154,7 @@ defmodule SApp.Chat do end def handle_cast({:peer_connected, conn_pid}, state) do - # this is called by the SNet.Group thing so it is already authenticated - SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) + GenServer.cast(conn_pid, {:send_msg, {:interested, [state.id]}}) {:noreply, state} end @@ -165,7 +164,7 @@ defmodule SApp.Chat do """ def handle_cast({:interested, conn_pid, auth}, state) do if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do - SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, true}}) end {:noreply, state} end @@ -225,13 +224,17 @@ defmodule SApp.Chat do state end end - {:root, new_root} -> - if new_root == state.mst.root do + {:root, new_root, ask_reply} -> + state = if new_root == state.mst.root do # already up to date, ignore state else init_merge(state, new_root, conn_pid) end + if ask_reply do + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, false}}) + end + state x -> Logger.info("Unhandled message: #{inspect x}") state diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex index 255bebb..390ef6d 100644 --- a/shard/lib/app/identity.ex +++ b/shard/lib/app/identity.ex @@ -88,15 +88,20 @@ defmodule SApp.Identity do end end + def handle_cast({:peer_connected, peer_pid}, state) do + GenServer.cast(peer_pid, {:send_msg, {:interested, [state.id]}}) + {:noreply, state} + end + def handle_cast({:interested, peer_pid, _auth}, state) do - SNet.Manager.send_pid(peer_pid, {state.id, nil, {:update, SData.SignRev.signed(state.state)}}) + SNet.Manager.send_pid(peer_pid, {state.id, nil, {:update, SData.SignRev.signed(state.state), true}}) {:noreply, state} end def handle_cast({:msg, conn_pid, _auth, _shard_id, nil, msg}, state) do state = case msg do - {:update, signed} when signed != nil -> - case SData.SignRev.merge(state.state, signed, state.pk) do + {:update, signed, ask_reply} when signed != nil -> + state = case SData.SignRev.merge(state.state, signed, state.pk) do {true, st2} -> Shard.Manager.save_state(state.id, st2) state = put_in(state.state, st2) @@ -105,6 +110,10 @@ defmodule SApp.Identity do {false, _} -> state end + if ask_reply do + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:update, SData.SignRev.signed(state.state), false}}) + end + state _ -> state end {:noreply, state} @@ -127,6 +136,6 @@ defmodule SApp.Identity do def bcast_state(state, _exclude \\ []) do # TODO: effectively apply exclude list - SNet.Group.broadcast(state.netgroup, {state.id, nil, {:update, SData.SignRev.signed(state.state)}}) + SNet.Group.broadcast(state.netgroup, {state.id, nil, {:update, SData.SignRev.signed(state.state), false}}) end end diff --git a/shard/lib/net/group.ex b/shard/lib/net/group.ex index f09d174..7086c2d 100644 --- a/shard/lib/net/group.ex +++ b/shard/lib/net/group.ex @@ -33,15 +33,18 @@ defmodule SNet.PubShardGroup do defstruct [:id] defimpl SNet.Group do - def init_lookup(%SNet.PubShardGroup{id: id}, _notify_to) do + def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do # For now: ask all currently connected peers and connect to new peers we know of spawn fn -> for {_, pid, _} <- SNet.Manager.list_connections do - GenServer.cast(pid, {:send_msg, {:interested, [id]}}) + GenServer.cast(notify_to, {:peer_connected, pid}) end for peer_info <- Shard.Manager.get_shard_peers id do if SNet.Manager.get_connections_to peer_info == [] do - SNet.Manager.add_peer(peer_info) # TODO callback when connected + SNet.Manager.add_peer(peer_info, + callback: fn pid -> + GenServer.cast(notify_to, {:peer_connected, pid}) + end) end end end @@ -96,8 +99,11 @@ defmodule SNet.PrivGroup do info = GenServer.call(pid, :get_info) if Map.has_key?(info, :peer_info) do for pi <- info.peer_info do - SNet.Manager.add_peer(pi, %SNet.Auth{my_pk: my_pk, his_pk: pk}) - # no callback here, we don't know if connect was successful + SNet.Manager.add_peer(pi, + auth: %SNet.Auth{my_pk: my_pk, his_pk: pk}, + callback: fn pid -> + GenServer.cast(notify_to, {:peer_connected, pid}) + end) end end end diff --git a/shard/lib/net/manager.ex b/shard/lib/net/manager.ex index 75307ee..624d7e6 100644 --- a/shard/lib/net/manager.ex +++ b/shard/lib/net/manager.ex @@ -22,8 +22,11 @@ defmodule SNet.Manager do {:ok, nil} end - def handle_call({:add_peer, peer_info, auth}, _from, state) do + def handle_call({:add_peer, peer_info, auth, callback}, _from, state) do pid = add_peer_internal(peer_info, auth) + if callback != nil do + GenServer.cast(pid, {:callback, callback}) + end {:reply, pid, state} end @@ -38,6 +41,7 @@ defmodule SNet.Manager do [[pid2]|_] when pid2 != pid -> {:reply, :redundant, state} _ -> + :ets.match_delete(:connections, {peer_info, pid, :_}) :ets.insert(:connections, {peer_info, pid, auth}) # Send interested message for all our shards @@ -81,8 +85,8 @@ defmodule SNet.Manager do @doc""" Connect to a peer specified by ip address and port """ - def add_peer(peer_info, auth \\ nil) do - GenServer.call(__MODULE__, {:add_peer, peer_info, auth}) + def add_peer(peer_info, opts \\ []) do + GenServer.call(__MODULE__, {:add_peer, peer_info, opts[:auth], opts[:callback]}) end @doc""" diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex index 25dc839..bd169aa 100644 --- a/shard/lib/net/tcpconn.ex +++ b/shard/lib/net/tcpconn.ex @@ -284,6 +284,11 @@ defmodule SNet.TCPConn do end end + def handle_cast({:callback, cb}, state) do + cb.(self()) + {:noreply, state} + end + def handle_cast({:send_msg, msg}, state) do msgbin = :erlang.term_to_binary msg enc = :enacl.secretbox(msgbin, state.nonce_send, state.secret_send) -- cgit v1.2.3