From e5a7330d0526efb592e200ab96c3f33585ae8d02 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Oct 2018 17:25:31 +0200 Subject: Initial support for private conversations --- shard/lib/app/chat.ex | 57 +++++++++++++++++++++++++++++------- shard/lib/app/identity.ex | 62 ++++++++++++++++++++++----------------- shard/lib/cli/cli.ex | 74 ++++++++++++++++++++++++++++++++++++++++++----- shard/lib/net/addr.ex | 4 +++ shard/lib/net/group.ex | 60 ++++++++++++++++++++++++++++++++++---- shard/lib/net/manager.ex | 58 +++++++++++++++++++++++++------------ shard/lib/net/tcpconn.ex | 42 ++++++++++++++++++--------- 7 files changed, 274 insertions(+), 83 deletions(-) (limited to 'shard') diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 53767ef..61403b8 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -7,6 +7,10 @@ defmodule SApp.Chat do %SApp.Chat.Manifest{channel: channel_name} + A private chat room manifest is of the form: + + %SApp.Chat.PrivChat.Manifest{pk_list: ordered_list_of_authorized_pks} + Future improvements: - message signing - storage of the chatroom messages to disk @@ -22,6 +26,10 @@ defmodule SApp.Chat do alias SData.MerkleSearchTree, as: MST + # ========= + # MANIFESTS + # ========= + defmodule Manifest do defstruct [:channel] end @@ -33,6 +41,24 @@ defmodule SApp.Chat do end + defmodule PrivChat.Manifest do + defstruct [:pk_list] + + def new(pk_list) do + %__MODULE__{pk_list: Enum.sort(pk_list)} + end + end + + defimpl Shard.Manifest, for: PrivChat.Manifest do + def start(m) do + DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m}) + end + end + + # ========== + # MAIN LOGIC + # ========== + @doc """ Start a process that connects to a given channel """ @@ -44,7 +70,6 @@ defmodule SApp.Chat do Initialize channel process. """ def init(manifest) do - %Manifest{channel: channel} = manifest id = SData.term_hash manifest case Shard.Manager.register(id, manifest, self()) do @@ -62,12 +87,16 @@ defmodule SApp.Chat do mst = %MST{store: %SApp.PageStore{pid: page_store}, cmp: &msg_cmp/2, root: root} - group = %SNet.PubShardGroup{id: id} - SNet.Group.init_lookup(group, self()) + netgroup = case manifest do + %Manifest{channel: _channel} -> + %SNet.PubShardGroup{id: id} + %PrivChat.Manifest{pk_list: pk_list} -> + %SNet.PrivGroup{pk_list: pk_list} + end + SNet.Group.init_lookup(netgroup, self()) {:ok, - %{channel: channel, - id: id, - group: group, + %{id: id, + netgroup: netgroup, manifest: manifest, page_store: page_store, mst: mst, @@ -114,13 +143,19 @@ defmodule SApp.Chat do for pid <- state.subs do if Process.alive?(pid) do - send(pid, {:chat_send, state.channel, msgitem}) + send(pid, {:chat_send, state.manifest, msgitem}) end end notif = {state.id, nil, {:append, prev_root, msgitem, mst.root}} - SNet.Group.broadcast(state.group, notif) + SNet.Group.broadcast(state.netgroup, notif) + + {:noreply, state} + end + def handle_cast({:peer_connected, conn_pid}, state) do + # this is called by the SNet.Group thing so it is already authenticated + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) {:noreply, state} end @@ -129,7 +164,7 @@ defmodule SApp.Chat do connected to asks to recieve data for this channel. """ def handle_cast({:interested, conn_pid, auth}, state) do - if SNet.Group.in_group?(state.group, conn_pid, auth) do + if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) end {:noreply, state} @@ -151,7 +186,7 @@ defmodule SApp.Chat do Merkle hash of the store of older messages. """ def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do - if not SNet.Group.in_group?(state.group, conn_pid, auth) do + if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do # Ignore message {:noreply, state} else @@ -250,7 +285,7 @@ defmodule SApp.Chat do defp msg_callback(state, {pk, msgbin, sign}) do for pid <- state.subs do if Process.alive?(pid) do - send(pid, {:chat_recv, state.channel, {pk, msgbin, sign}}) + send(pid, {:chat_recv, state.manifest, {pk, msgbin, sign}}) end end end diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex index 391d37e..255bebb 100644 --- a/shard/lib/app/identity.ex +++ b/shard/lib/app/identity.ex @@ -35,8 +35,12 @@ defmodule SApp.Identity do st -> st end - GenServer.cast(self(), :init_pull) - {:ok, %{pk: pk, id: id, state: state}} + netgroup = %SNet.PubShardGroup{id: id} + SNet.Group.init_lookup(netgroup, self()) + if Shard.Keys.have_sk? pk do + GenServer.cast(self(), :update_peer_info) + end + {:ok, %{pk: pk, id: id, state: state, netgroup: netgroup}} :redundant -> exit(:redundant) end @@ -52,7 +56,8 @@ defmodule SApp.Identity do id = SData.term_hash manifest case Shard.Manager.find_proc id do nil -> - Shard.Manifest.start manifest + {:ok, pid} = Shard.Manifest.start manifest + pid pid -> pid end end @@ -64,7 +69,7 @@ defmodule SApp.Identity do end def handle_call(:manifest, _from, state) do - {:replyl, state.manifest, state} + {:reply, state.manifest, state} end def handle_call(:get_info, _from, state) do @@ -72,22 +77,15 @@ defmodule SApp.Identity do end def handle_call({:set_info, new_info}, _from, state) do - case SData.SignRev.set(state.state, new_info, state.pk) do - {:ok, st2} -> - Shard.Manager.save_state(state.id, st2) - state = put_in(state.state, st2) - bcast_state(state) - {:reply, :ok, state} - err -> - {:reply, err, state} - end - end - - def handle_cast(:init_pull, state) do - for {_, pid, _} <- SNet.Manager.list_connections do - GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) + if Shard.Keys.have_sk?(state.pk) do + {:ok, st2} = SData.SignRev.set(state.state, new_info, state.pk) + Shard.Manager.save_state(state.id, st2) + state = put_in(state.state, st2) + bcast_state(state) + {:reply, :ok, state} + else + {:reply, :impossible, state} end - {:noreply, state} end def handle_cast({:interested, peer_pid, _auth}, state) do @@ -102,7 +100,7 @@ defmodule SApp.Identity do {true, st2} -> Shard.Manager.save_state(state.id, st2) state = put_in(state.state, st2) - bcast_state(state, [GenServer.call(conn_pid, :get_peer_info)]) + bcast_state(state, [conn_pid]) state {false, _} -> state @@ -112,11 +110,23 @@ defmodule SApp.Identity do {:noreply, state} end - def bcast_state(state, exclude \\ []) do - for peer_id <- Shard.Manager.get_shard_peers(state.id) do - if not Enum.member? exclude, peer_id do - SNet.Manager.send(peer_id, {state.id, nil, {:update, SData.SignRev.signed(state.state)}}) - end - end + def handle_cast(:update_peer_info, state) do + peer_info = SNet.Addr.get_all_inet4() + |> Enum.map(&({:inet, &1, Application.get_env(:shard, :port)})) + + prev_info = SData.SignRev.get(state.state) + # TODO multi peer info + new_info = Map.put(prev_info, :peer_info, peer_info) + {:ok, st2} = SData.SignRev.set(state.state, new_info, state.pk) + + Shard.Manager.save_state(state.id, st2) + state = put_in(state.state, st2) + bcast_state(state) + {:noreply, state} + end + + def bcast_state(state, _exclude \\ []) do + # TODO: effectively apply exclude list + SNet.Group.broadcast(state.netgroup, {state.id, nil, {:update, SData.SignRev.signed(state.state)}}) end end diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex index f7e8525..85fa3fc 100644 --- a/shard/lib/cli/cli.ex +++ b/shard/lib/cli/cli.ex @@ -17,7 +17,7 @@ defmodule SCLI do end defp run(state) do - handle_messages() + handle_messages(state) id_pid = case state.id_pid do nil -> SApp.Identity.find_proc(state.pk) @@ -35,8 +35,16 @@ defmodule SCLI do prompt = case state.room_pid do nil -> "(no channel) #{nick}: " _ -> - %SApp.Chat.Manifest{channel: chan} = GenServer.call(state.room_pid, :manifest) - "##{chan} #{nick}: " + case GenServer.call(state.room_pid, :manifest) do + %SApp.Chat.Manifest{channel: chan} -> + "##{chan} #{nick}: " + %SApp.Chat.PrivChat.Manifest{pk_list: pk_list} -> + nicks = pk_list + |> Enum.filter(&(&1 != state.pk)) + |> Enum.map(&("#{SApp.Identity.get_nick &1} #{Shard.Keys.pk_display &1}")) + |> Enum.join(", ") + "PM #{nicks} #{nick}: " + end end str = prompt |> IO.gets |> String.trim @@ -55,16 +63,21 @@ defmodule SCLI do end end - defp handle_messages() do + defp handle_messages(state) do receive do - {:chat_recv, chan, {pk, msgbin, _sign}} -> + {:chat_recv, manifest, {pk, msgbin, _sign}} -> {ts, msg} = SData.term_unbin msgbin nick = SApp.Identity.get_nick pk - IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick} #{Shard.Keys.pk_display pk}> #{msg}" - handle_messages() + case manifest do + %SApp.Chat.Manifest{channel: chan} -> + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick} #{Shard.Keys.pk_display pk}> #{msg}" + %SApp.Chat.PrivChat.Manifest{pk_list: pk_list} -> + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} PM(#{Enum.count pk_list}) <#{nick} #{Shard.Keys.pk_display pk}> #{msg}" + end + handle_messages(state) {:chat_send, _, _} -> # do nothing - handle_messages() + handle_messages(state) after 10 -> nil end end @@ -105,13 +118,58 @@ defmodule SCLI do nil -> {:ok, pid} = Shard.Manifest.start %SApp.Chat.Manifest{channel: qchan} GenServer.cast(pid, {:subscribe, self()}) + IO.puts "Joining ##{qchan} (new shard)" %{state | room_pid: pid} pid -> + GenServer.cast(pid, {:subscribe, self()}) IO.puts "Switching to ##{qchan}" %{state | room_pid: pid} end end + defp handle_command(state, ["pm" | people_list]) do + known_people = for {_, %SApp.Identity.Manifest{pk: pk}, pid} <- Shard.Manager.list_shards() do + info = GenServer.call(pid, :get_info) + {pk, info.nick} + end + pk_list = for qname <- people_list do + candidates = for {pk, nick} <- known_people, + :binary.longest_common_prefix([qname, nick]) == byte_size(qname) + or :binary.longest_common_prefix([qname, Shard.Keys.pk_display pk]) == byte_size(qname), + do: {pk, nick} + case candidates do + [] -> + IO.puts "Not found: #{qname}" + :error + [{pk, _}] -> pk + _ -> + IO.puts "Several people matching for #{qname}:" + for {pk, nick} <- candidates do + IO.puts "- #{nick} #{Shard.Keys.pk_display pk}" + end + :error + end + end + if Enum.all?(pk_list, &(&1 != :error)) do + pk_list = [state.pk | pk_list] + manifest = SApp.Chat.PrivChat.Manifest.new(pk_list) + id = SData.term_hash manifest + case Shard.Manager.find_proc id do + nil -> + {:ok, pid} = Shard.Manifest.start manifest + GenServer.cast(pid, {:subscribe, self()}) + IO.puts "Joining private conversation (new shard)." + %{state | room_pid: pid} + pid -> + GenServer.cast(pid, {:subscribe, self()}) + IO.puts "Switching to private conversation." + %{state | room_pid: pid} + end + else + state + end + end + defp handle_command(state, ["nick", nick]) do pid = case state.id_pid do nil -> SApp.Identity.find_proc state.pk diff --git a/shard/lib/net/addr.ex b/shard/lib/net/addr.ex index 645e109..630f95a 100644 --- a/shard/lib/net/addr.ex +++ b/shard/lib/net/addr.ex @@ -24,4 +24,8 @@ defmodule SNet.Addr do addrset = MapSet.put(addrset, get_pub_inet4()) MapSet.to_list addrset end + + def is_local?({:inet, ip, port}) do + port == Application.get_env(:shard, :port) and (ip == {127,0,0,1} or ip in get_if_inet4()) + end end diff --git a/shard/lib/net/group.ex b/shard/lib/net/group.ex index 692438a..f09d174 100644 --- a/shard/lib/net/group.ex +++ b/shard/lib/net/group.ex @@ -81,19 +81,67 @@ defmodule SNet.PrivGroup do defstruct [:pk_list] defimpl SNet.Group do - def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do - # TODO + def init_lookup(%SNet.PrivGroup{pk_list: pk_list}, notify_to) do + spawn fn -> + # 1. We might already have some connections to these guys + for {_, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}} <- SNet.Manager.list_connections do + if (my_pk in pk_list) and (his_pk in pk_list) do + GenServer.cast(notify_to, {:peer_connected, pid}) + end + end + # 2. We might also want to open some new connections to these guys + [my_pk|_] = Enum.filter(pk_list, &Shard.Keys.have_sk?/1) + for pk <- pk_list do + pid = SApp.Identity.find_proc(pk) + info = GenServer.call(pid, :get_info) + if Map.has_key?(info, :peer_info) do + for pi <- info.peer_info do + SNet.Manager.add_peer(pi, %SNet.Auth{my_pk: my_pk, his_pk: pk}) + # no callback here, we don't know if connect was successful + end + end + end + end end - def get_connections(%SNet.PubShardGroup{id: id}) do - # TODO + def get_connections(%SNet.PrivGroup{pk_list: pk_list}) do + for {_, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}} <- SNet.Manager.list_connections, + (my_pk in pk_list) and (his_pk in pk_list), + do: pid end def broadcast(group, msg, nmax) do + %SNet.PrivGroup{pk_list: pk_list} = group + nsent = get_connections(group) + |> Enum.shuffle + |> Enum.take(nmax) + |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg}))) + |> Enum.count + if nmax - nsent > 0 do + my_pks = Enum.filter(pk_list, &Shard.Keys.have_sk?/1) + [my_pk|_] = my_pks + candidates = for pk <- pk_list, + pid = SApp.Identity.find_proc(pk), + info = GenServer.call(pid, :get_info), + Map.has_key?(info, :peer_info), + xx <- info.peer_info, + do: {xx, pk} + candidates + |> Enum.filter(fn {peer_info, his_pk} -> + SNet.Manager.get_auth_connections_to(peer_info, my_pks, his_pk) == [] end) + |> Enum.shuffle() + |> Enum.take(nmax - nsent) + |> Enum.map(fn {peer_info, his_pk} -> + SNet.Manager.send_auth(peer_info, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, msg) end) + end end - def in_group?(%SNet.PubShardGroup{id: _id}, peer_pid, auth) do - # TODO + def in_group?(%SNet.PrivGroup{pk_list: pk_list}, _peer_pid, auth) do + case auth do + nil -> false + %SNet.Auth{my_pk: my_pk, his_pk: his_pk} -> + (my_pk in pk_list) and (his_pk in pk_list) + end end end end diff --git a/shard/lib/net/manager.ex b/shard/lib/net/manager.ex index 17d6e06..75307ee 100644 --- a/shard/lib/net/manager.ex +++ b/shard/lib/net/manager.ex @@ -22,8 +22,8 @@ defmodule SNet.Manager do {:ok, nil} end - def handle_call({:add_peer, peer_info}, _from, state) do - pid = add_peer_internal(peer_info) + def handle_call({:add_peer, peer_info, auth}, _from, state) do + pid = add_peer_internal(peer_info, auth) {:reply, pid, state} end @@ -34,8 +34,8 @@ defmodule SNet.Manager do end def handle_call({:peer_up, pid, peer_info, auth}, _from, state) do - case :ets.match(:connections, {peer_info, :_, auth}) do - [{_, pid2, _}] when pid2 != pid -> + case :ets.match(:connections, {peer_info, :'$1', auth}) do + [[pid2]|_] when pid2 != pid -> {:reply, :redundant, state} _ -> :ets.insert(:connections, {peer_info, pid, auth}) @@ -48,8 +48,8 @@ defmodule SNet.Manager do end end - def handle_cast({:connect_and_send, peer_info, msg}, state) do - pid = add_peer_internal(peer_info) + def handle_cast({:connect_and_send, peer_info, auth, msg}, state) do + pid = add_peer_internal(peer_info, auth) GenServer.cast(pid, {:send_msg, msg}) {:noreply, state} end @@ -59,15 +59,18 @@ defmodule SNet.Manager do {:noreply, state} end - defp add_peer_internal(peer_info) do - case :ets.lookup(:connections, peer_info) do - [{_, pid, _}|_] -> - pid - [] -> - my_port = Application.get_env(:shard, :port) - {:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: nil}) - :ets.insert(:connections, {peer_info, pid, nil}) - pid + defp add_peer_internal(peer_info, auth) do + if SNet.Addr.is_local? peer_info do + nil + else + case :ets.match(:connections, {peer_info, :'$1', (if auth != nil do auth else :_ end)}) do + [[pid]|_] -> pid + [] -> + my_port = Application.get_env(:shard, :port) + {:ok, pid} = SNet.TCPConn.start_link(%{connect_to: peer_info, my_port: my_port, auth: auth}) + :ets.insert(:connections, {peer_info, pid, nil}) + pid + end end end @@ -78,8 +81,8 @@ defmodule SNet.Manager do @doc""" Connect to a peer specified by ip address and port """ - def add_peer(peer_info) do - GenServer.call(__MODULE__, {:add_peer, peer_info}) + def add_peer(peer_info, auth \\ nil) do + GenServer.call(__MODULE__, {:add_peer, peer_info, auth}) end @doc""" @@ -96,6 +99,16 @@ defmodule SNet.Manager do for {^peer_info, pid, auth} <- :ets.lookup(:connections, peer_info), do: {pid, auth} end + @doc""" + Return the list of connections to a given peer that match a given auth spec + """ + def get_auth_connections_to(peer_info, my_auth, his_auth) do + for {^peer_info, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}} <- :ets.lookup(:connections, peer_info), + my_pk == my_auth or my_pk in my_auth, + his_pk == his_auth or his_pk in his_auth, + do: pid + end + @doc""" Send message to a peer specified by peer info. Opens a connection if necessary. @@ -105,7 +118,16 @@ defmodule SNet.Manager do [{^peer_info, pid, _auth}|_] -> GenServer.cast(pid, {:send_msg, msg}) [] -> - GenServer.cast(__MODULE__, {:connect_and_send, peer_info, msg}) + GenServer.cast(__MODULE__, {:connect_and_send, peer_info, nil, msg}) + end + end + + def send_auth(peer_info, auth, msg) do + case :ets.match(:connections, {peer_info, :'$1', auth}) do + [[pid]|_] -> + GenServer.cast(pid, {:send_msg, msg}) + [] -> + GenServer.cast(__MODULE__, {:connect_and_send, peer_info, auth, msg}) end end diff --git a/shard/lib/net/tcpconn.ex b/shard/lib/net/tcpconn.ex index 476c426..25dc839 100644 --- a/shard/lib/net/tcpconn.ex +++ b/shard/lib/net/tcpconn.ex @@ -96,8 +96,7 @@ defmodule SNet.TCPConn do auth: nil, } - {cli_longterm_pk, srv_list_pk} -> - [srv_longterm_pk] = srv_list_pk + %SNet.Auth{my_pk: cli_longterm_pk, his_pk: srv_longterm_pk} -> cli_longterm_sk = Shard.Keys.get_sk cli_longterm_pk sh_sec_aB = :enacl.curve25519_scalarmult(cli_eph_sk, :enacl.crypto_sign_ed25519_public_to_curve25519(srv_longterm_pk)) @@ -148,12 +147,13 @@ defmodule SNet.TCPConn do |> Map.put(:peer_info, {:inet, addr, port}) |> Map.put(:my_port, state.my_port) - if GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) == :redundant do - exit :redundant + case GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) do + :ok -> + Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" + {:noreply, state} + :redundant -> + exit :redundant end - - Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" - {:noreply, state} end def handle_cast(:server_handshake, state) do @@ -209,9 +209,22 @@ defmodule SNet.TCPConn do _ -> # Client authenticates - srv_longterm_pk = state.my_auth # TODO this is not ok + srv_longterm_pk = Enum.find( + Shard.Keys.list_identities(), + fn srv_longterm_pk -> + srv_longterm_sk = Shard.Keys.get_sk srv_longterm_pk + sh_sec_aB = :enacl.curve25519_scalarmult(:enacl.crypto_sign_ed25519_secret_to_curve25519(srv_longterm_sk), cli_eph_pk) + key3 = :crypto.hash(:sha256, net_key <> sh_sec_ab <> sh_sec_aB) + case :enacl.secretbox_open(cli_auth, <<0 :: 24*8>>, key3) do + {:ok, _cli_auth_plain} -> true + _ -> false + end + end) + + if srv_longterm_pk == nil do + exit :bad_auth + end srv_longterm_sk = Shard.Keys.get_sk srv_longterm_pk - sh_sec_aB = :enacl.curve25519_scalarmult(:enacl.crypto_sign_ed25519_secret_to_curve25519(srv_longterm_sk), cli_eph_pk) key3 = :crypto.hash(:sha256, net_key <> sh_sec_ab <> sh_sec_aB) @@ -262,12 +275,13 @@ defmodule SNet.TCPConn do |> Map.put(:peer_info, {:inet, addr, his_port}) |> Map.put(:my_port, state.my_port) - if GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) == :redundant do - exit :redundant + case GenServer.call(SNet.Manager, {:peer_up, self(), state.peer_info, state.auth}) do + :ok -> + Logger.info "New peer: #{print_id state} at #{inspect state.peer_info} (#{port})" + {:noreply, state} + :redundant -> + exit(:redundant) end - - Logger.info "New peer: #{print_id state} at #{inspect state.peer_info} (#{port})" - {:noreply, state} end def handle_cast({:send_msg, msg}, state) do -- cgit v1.2.3