From eb8c949551ffb8b3600357d7ff2bebe750af96e5 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Oct 2018 14:22:00 +0200 Subject: Address, broadcast group management --- shard/lib/app/chat.ex | 123 ++++++++++++++++++++++------------------------ shard/lib/app/identity.ex | 19 +++---- shard/lib/manager.ex | 4 ++ shard/lib/net/addr.ex | 27 ++++++++++ shard/lib/net/group.ex | 98 ++++++++++++++++++++++++++++++++++++ 5 files changed, 196 insertions(+), 75 deletions(-) create mode 100644 shard/lib/net/addr.ex create mode 100644 shard/lib/net/group.ex (limited to 'shard') diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 0f4d573..051dfef 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -5,7 +5,7 @@ defmodule SApp.Chat do Chat rooms are globally identified by their channel name. A chat room manifest is of the form: - {:chat, channel_name} + %SApp.Chat.Manifest{channel: channel_name} Future improvements: - message signing @@ -28,7 +28,7 @@ defmodule SApp.Chat do defimpl Shard.Manifest, for: Manifest do def start(m) do - DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m.channel}) + DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m}) end end @@ -36,15 +36,15 @@ defmodule SApp.Chat do @doc """ Start a process that connects to a given channel """ - def start_link(channel) do - GenServer.start_link(__MODULE__, channel) + def start_link(manifest) do + GenServer.start_link(__MODULE__, manifest) end @doc """ Initialize channel process. """ - def init(channel) do - manifest = %Manifest{channel: channel} + def init(manifest) do + %Manifest{channel: channel} = manifest id = SData.term_hash manifest case Shard.Manager.register(id, manifest, self()) do @@ -62,10 +62,12 @@ defmodule SApp.Chat do mst = %MST{store: %SApp.PageStore{pid: page_store}, cmp: &msg_cmp/2, root: root} - GenServer.cast(self(), :init_pull) + group = %SNet.PubShardGroup{id: id} + SNet.Group.init_lookup(group, self()) {:ok, %{channel: channel, id: id, + group: group, manifest: manifest, page_store: page_store, mst: mst, @@ -95,18 +97,6 @@ defmodule SApp.Chat do {:reply, ret, state} end - @doc """ - Implementation of the :init_pull handler, which is called when the - process starts. It contacts all currently connected peers and asks them to - send data for this channel if they have some. - """ - def handle_cast(:init_pull, state) do - for {_, pid, _} <- Shard.Manager.list_connections do - GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) - end - {:noreply, state} - end - @doc """ Implementation of the :chat_send handler. This is the main handler that is used to send a message to the chat room. Puts the message in the store and syncs @@ -129,9 +119,7 @@ defmodule SApp.Chat do end notif = {:append, prev_root, msgitem, mst.root} - for peer_info <- Shard.Manager.get_shard_peers(state.id) do - Shard.Manager.send(peer_info, {state.id, nil, notif}) - end + SNet.Group.broadcast(state.group, notif) {:noreply, state} end @@ -140,8 +128,10 @@ defmodule SApp.Chat do Implementation of the :interested handler, this is called when a peer we are connected to asks to recieve data for this channel. """ - def handle_cast({:interested, conn_pid, _auth}, state) do - Shard.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) + def handle_cast({:interested, conn_pid, auth}, state) do + if SNet.Group.in_group?(state.group, conn_pid, auth) do + Shard.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}}) + end {:noreply, state} end @@ -160,54 +150,59 @@ defmodule SApp.Chat do - `{:info, start, list, rest}`: put some messages and informs of the Merkle hash of the store of older messages. """ - def handle_cast({:msg, conn_pid, _auth, _shard_id, nil, msg}, state) do - state = case msg do - {:get_manifest} -> - Shard.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}}) - state - {:append, prev_root, msgitem, new_root} -> - # Append message: one single mesage has arrived - if new_root == state.mst.root do - # We already have the message, do nothing + def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do + if not SNet.Group.in_group?(state.group, conn_pid, auth) do + # Ignore message + {:noreply, state} + else + state = case msg do + {:get_manifest} -> + Shard.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}}) state - else - # Try adding the message - {pk, bin, sign} = msgitem - if Shard.Keys.verify(pk, bin, sign) == :ok do - if prev_root == state.mst.root do - # Only one new message, insert it directly - mst2 = MST.insert(state.mst, msgitem) - if mst2.root == new_root do - state = %{state | mst: mst2} - GenServer.cast(state.page_store, {:set_roots, [mst2.root]}) - Shard.Manager.save_state(state.id, mst2.root) - msg_callback(state, msgitem) - state + {:append, prev_root, msgitem, new_root} -> + # Append message: one single mesage has arrived + if new_root == state.mst.root do + # We already have the message, do nothing + state + else + # Try adding the message + {pk, bin, sign} = msgitem + if Shard.Keys.verify(pk, bin, sign) == :ok do + if prev_root == state.mst.root do + # Only one new message, insert it directly + mst2 = MST.insert(state.mst, msgitem) + if mst2.root == new_root do + state = %{state | mst: mst2} + GenServer.cast(state.page_store, {:set_roots, [mst2.root]}) + Shard.Manager.save_state(state.id, mst2.root) + msg_callback(state, msgitem) + state + else + Logger.warn("Invalid new root after inserting same message item!") + state + end else - Logger.warn("Invalid new root after inserting same message item!") - state + # Not a simple one-insertion transition, look at the whole tree + init_merge(state, new_root, conn_pid) end else - # Not a simple one-insertion transition, look at the whole tree - init_merge(state, new_root, conn_pid) + Logger.warn("Received message with invalid signature") + state end - else - Logger.warn("Received message with invalid signature") + end + {:root, new_root} -> + if new_root == state.mst.root do + # already up to date, ignore state + else + init_merge(state, new_root, conn_pid) end - end - {:root, new_root} -> - if new_root == state.mst.root do - # already up to date, ignore + x -> + Logger.info("Unhandled message: #{inspect x}") state - else - init_merge(state, new_root, conn_pid) - end - x -> - Logger.info("Unhandled message: #{inspect x}") - state + end + {:noreply, state} end - {:noreply, state} end defp init_merge(state, new_root, source_peer_pid) do @@ -260,7 +255,7 @@ defmodule SApp.Chat do end end - defp msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do + def msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do {ts1, msg1} = SData.term_unbin msgbin1 {ts2, msg2} = SData.term_unbin msgbin2 cond do diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex index 6909ad3..d2748a1 100644 --- a/shard/lib/app/identity.ex +++ b/shard/lib/app/identity.ex @@ -50,20 +50,17 @@ defmodule SApp.Identity do def find_proc(pk) do manifest = %Manifest{pk: pk} id = SData.term_hash manifest - Shard.Manager.find_proc id + case Shard.Manager.find_proc id do + nil -> + Shard.Manifest.start manifest + pid -> pid + end end def get_nick(pk) do - case find_proc pk do - nil -> - if Shard.Keys.valid_identity_pk? pk do - Shard.Manifest.start %Manifest{pk: pk} - end - default_nick pk - pid -> - info = GenServer.call(pid, :get_info) - info.nick - end + pid = find_proc pk + info = GenServer.call(pid, :get_info) + info.nick end def handle_call(:manifest, _from, state) do diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index 08e14c7..1e089ba 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -315,4 +315,8 @@ defmodule Shard.Manager do def list_connections() do for [x] <- :ets.match(:connections, :"$1"), do: x end + + def get_connections_to(peer_info) do + for {^peer_info, pid, auth} <- :ets.lookup(:connections, peer_info), do: {pid, auth} + end end diff --git a/shard/lib/net/addr.ex b/shard/lib/net/addr.ex new file mode 100644 index 0000000..645e109 --- /dev/null +++ b/shard/lib/net/addr.ex @@ -0,0 +1,27 @@ +defmodule SNet.Addr do + + def get_if_inet4 do + {:ok, ifs} = :inet.getifaddrs + for {_, opts} <- ifs, + {:addr, addr} <- opts, + tuple_size(addr) == 4, + addr != {127,0,0,1} + do + addr + end + end + + def get_pub_inet4 do + Application.ensure_all_started(:inets) + Application.ensure_all_started(:ssl) + {:ok, {_, _, body}} = :httpc.request('http://api.ipify.org') + {:ok, addr} = :inet.parse_address body + addr + end + + def get_all_inet4 do + addrset = for x <- get_if_inet4(), into: %MapSet{}, do: x + addrset = MapSet.put(addrset, get_pub_inet4()) + MapSet.to_list addrset + end +end diff --git a/shard/lib/net/group.ex b/shard/lib/net/group.ex new file mode 100644 index 0000000..d2c2537 --- /dev/null +++ b/shard/lib/net/group.ex @@ -0,0 +1,98 @@ +defprotocol SNet.Group do + @moduledoc""" + A group is a specification of a bunch of peers we want and accept to talk to + about some things. It supports a number of abstract operations for finding peers, + broadcasting/gossiping, authenticating, etc. + """ + + @doc""" + Find new peers for this group, open connections and notify us when connections are open. + + Launches background processes if necessary, returns immediately. + """ + def init_lookup(group, notify_to) + + @doc""" + Get all currently open connections to peers in this group. + """ + def get_connections(group) + + @doc""" + Broadcast a message to peers of the group. + Will send to at most nmax peers, so this is a good primitive for gossip. + """ + def broadcast(group, msg, nmax \\ 10) + + @doc""" + Check if a peer is allowed to participate in this group. + """ + def in_group?(group, conn_pid, auth) +end + +defmodule SNet.PubShardGroup do + defstruct [:id] + + defimpl SNet.Group do + def init_lookup(%SNet.PubShardGroup{id: id}, _notify_to) do + # For now: ask all currently connected peers and connect to new peers we know of + spawn fn -> + for {_, pid, _} <- Shard.Manager.list_connections do + GenServer.cast(pid, {:send_msg, {:interested, [id]}}) + end + for peer_info <- Shard.Manager.get_shard_peers id do + if Shard.Manager.get_connections_to peer_info == [] do + Shard.Manager.add_peer(peer_info) # TODO callback when connected + end + end + end + # TODO: use a DHT to find peers + end + + def get_connections(%SNet.PubShardGroup{id: id}) do + for peer_info <- Shard.Manager.get_shard_peers(id), + [{pid, _auth}|_] = Shard.Manager.get_connections_to(peer_info), + do: pid + end + + def broadcast(group, msg, nmax) do + %SNet.PubShardGroup{id: id} = group + nsent = get_connections(group) + |> Enum.shuffle + |> Enum.take(nmax) + |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg}))) + |> Enum.count + if nmax - nsent > 0 do + Shard.Manager.get_shard_peers(id) + |> Enum.filter(&(Shard.Manager.get_connections_to(&1) == [])) + |> Enum.shuffle + |> Enum.take(nmax - nsent) + |> Enum.map(&(Shard.Manager.send(&1, msg))) + end + end + + def in_group?(%SNet.PubShardGroup{id: _id}, _peer_pid, _auth) do + true # No access control + end + end +end + +defmodule SNet.PrivGroup do + defstruct [:pk_list] + + defimpl SNet.Group do + def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do + # TODO + end + + def get_connections(%SNet.PubShardGroup{id: id}) do + # TODO + end + + def broadcast(group, msg, nmax) do + end + + def in_group?(%SNet.PubShardGroup{id: _id}, peer_pid, auth) do + # TODO + end + end +end -- cgit v1.2.3