diff options
authorAlex Auvolat <alex@adnab.me>2018-10-11 14:22:00 +0200
committerAlex Auvolat <alex@adnab.me>2018-10-11 14:22:00 +0200
commiteb8c949551ffb8b3600357d7ff2bebe750af96e5 (patch)
parent7b6042205e7c6135fae4e0d21dbf7a5975e8491b (diff)
Address, broadcast group management
5 files changed, 196 insertions, 75 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index 0f4d573..051dfef 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -5,7 +5,7 @@ defmodule SApp.Chat do
Chat rooms are globally identified by their channel name.
A chat room manifest is of the form:
- {:chat, channel_name}
+ %SApp.Chat.Manifest{channel: channel_name}
Future improvements:
- message signing
@@ -28,7 +28,7 @@ defmodule SApp.Chat do
defimpl Shard.Manifest, for: Manifest do
def start(m) do
- DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m.channel})
+ DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m})
@@ -36,15 +36,15 @@ defmodule SApp.Chat do
@doc """
Start a process that connects to a given channel
- def start_link(channel) do
- GenServer.start_link(__MODULE__, channel)
+ def start_link(manifest) do
+ GenServer.start_link(__MODULE__, manifest)
@doc """
Initialize channel process.
- def init(channel) do
- manifest = %Manifest{channel: channel}
+ def init(manifest) do
+ %Manifest{channel: channel} = manifest
id = SData.term_hash manifest
case Shard.Manager.register(id, manifest, self()) do
@@ -62,10 +62,12 @@ defmodule SApp.Chat do
mst = %MST{store: %SApp.PageStore{pid: page_store},
cmp: &msg_cmp/2,
root: root}
- GenServer.cast(self(), :init_pull)
+ group = %SNet.PubShardGroup{id: id}
+ SNet.Group.init_lookup(group, self())
%{channel: channel,
id: id,
+ group: group,
manifest: manifest,
page_store: page_store,
mst: mst,
@@ -96,18 +98,6 @@ defmodule SApp.Chat do
@doc """
- Implementation of the :init_pull handler, which is called when the
- process starts. It contacts all currently connected peers and asks them to
- send data for this channel if they have some.
- """
- def handle_cast(:init_pull, state) do
- for {_, pid, _} <- Shard.Manager.list_connections do
- GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
- end
- {:noreply, state}
- end
- @doc """
Implementation of the :chat_send handler. This is the main handler that is used
to send a message to the chat room. Puts the message in the store and syncs
with all connected peers.
@@ -129,9 +119,7 @@ defmodule SApp.Chat do
notif = {:append, prev_root, msgitem, mst.root}
- for peer_info <- Shard.Manager.get_shard_peers(state.id) do
- Shard.Manager.send(peer_info, {state.id, nil, notif})
- end
+ SNet.Group.broadcast(state.group, notif)
{:noreply, state}
@@ -140,8 +128,10 @@ defmodule SApp.Chat do
Implementation of the :interested handler, this is called when a peer we are
connected to asks to recieve data for this channel.
- def handle_cast({:interested, conn_pid, _auth}, state) do
- Shard.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}})
+ def handle_cast({:interested, conn_pid, auth}, state) do
+ if SNet.Group.in_group?(state.group, conn_pid, auth) do
+ Shard.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}})
+ end
{:noreply, state}
@@ -160,54 +150,59 @@ defmodule SApp.Chat do
- `{:info, start, list, rest}`: put some messages and informs of the
Merkle hash of the store of older messages.
- def handle_cast({:msg, conn_pid, _auth, _shard_id, nil, msg}, state) do
- state = case msg do
- {:get_manifest} ->
- Shard.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}})
- state
- {:append, prev_root, msgitem, new_root} ->
- # Append message: one single mesage has arrived
- if new_root == state.mst.root do
- # We already have the message, do nothing
+ def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
+ if not SNet.Group.in_group?(state.group, conn_pid, auth) do
+ # Ignore message
+ {:noreply, state}
+ else
+ state = case msg do
+ {:get_manifest} ->
+ Shard.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}})
- else
- # Try adding the message
- {pk, bin, sign} = msgitem
- if Shard.Keys.verify(pk, bin, sign) == :ok do
- if prev_root == state.mst.root do
- # Only one new message, insert it directly
- mst2 = MST.insert(state.mst, msgitem)
- if mst2.root == new_root do
- state = %{state | mst: mst2}
- GenServer.cast(state.page_store, {:set_roots, [mst2.root]})
- Shard.Manager.save_state(state.id, mst2.root)
- msg_callback(state, msgitem)
- state
+ {:append, prev_root, msgitem, new_root} ->
+ # Append message: one single mesage has arrived
+ if new_root == state.mst.root do
+ # We already have the message, do nothing
+ state
+ else
+ # Try adding the message
+ {pk, bin, sign} = msgitem
+ if Shard.Keys.verify(pk, bin, sign) == :ok do
+ if prev_root == state.mst.root do
+ # Only one new message, insert it directly
+ mst2 = MST.insert(state.mst, msgitem)
+ if mst2.root == new_root do
+ state = %{state | mst: mst2}
+ GenServer.cast(state.page_store, {:set_roots, [mst2.root]})
+ Shard.Manager.save_state(state.id, mst2.root)
+ msg_callback(state, msgitem)
+ state
+ else
+ Logger.warn("Invalid new root after inserting same message item!")
+ state
+ end
- Logger.warn("Invalid new root after inserting same message item!")
- state
+ # Not a simple one-insertion transition, look at the whole tree
+ init_merge(state, new_root, conn_pid)
- # Not a simple one-insertion transition, look at the whole tree
- init_merge(state, new_root, conn_pid)
+ Logger.warn("Received message with invalid signature")
+ state
- else
- Logger.warn("Received message with invalid signature")
+ end
+ {:root, new_root} ->
+ if new_root == state.mst.root do
+ # already up to date, ignore
+ else
+ init_merge(state, new_root, conn_pid)
- end
- {:root, new_root} ->
- if new_root == state.mst.root do
- # already up to date, ignore
+ x ->
+ Logger.info("Unhandled message: #{inspect x}")
- else
- init_merge(state, new_root, conn_pid)
- end
- x ->
- Logger.info("Unhandled message: #{inspect x}")
- state
+ end
+ {:noreply, state}
- {:noreply, state}
defp init_merge(state, new_root, source_peer_pid) do
@@ -260,7 +255,7 @@ defmodule SApp.Chat do
- defp msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do
+ def msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do
{ts1, msg1} = SData.term_unbin msgbin1
{ts2, msg2} = SData.term_unbin msgbin2
cond do
diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex
index 6909ad3..d2748a1 100644
--- a/shard/lib/app/identity.ex
+++ b/shard/lib/app/identity.ex
@@ -50,20 +50,17 @@ defmodule SApp.Identity do
def find_proc(pk) do
manifest = %Manifest{pk: pk}
id = SData.term_hash manifest
- Shard.Manager.find_proc id
+ case Shard.Manager.find_proc id do
+ nil ->
+ Shard.Manifest.start manifest
+ pid -> pid
+ end
def get_nick(pk) do
- case find_proc pk do
- nil ->
- if Shard.Keys.valid_identity_pk? pk do
- Shard.Manifest.start %Manifest{pk: pk}
- end
- default_nick pk
- pid ->
- info = GenServer.call(pid, :get_info)
- info.nick
- end
+ pid = find_proc pk
+ info = GenServer.call(pid, :get_info)
+ info.nick
def handle_call(:manifest, _from, state) do
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index 08e14c7..1e089ba 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -315,4 +315,8 @@ defmodule Shard.Manager do
def list_connections() do
for [x] <- :ets.match(:connections, :"$1"), do: x
+ def get_connections_to(peer_info) do
+ for {^peer_info, pid, auth} <- :ets.lookup(:connections, peer_info), do: {pid, auth}
+ end
diff --git a/shard/lib/net/addr.ex b/shard/lib/net/addr.ex
new file mode 100644
index 0000000..645e109
--- /dev/null
+++ b/shard/lib/net/addr.ex
@@ -0,0 +1,27 @@
+defmodule SNet.Addr do
+ def get_if_inet4 do
+ {:ok, ifs} = :inet.getifaddrs
+ for {_, opts} <- ifs,
+ {:addr, addr} <- opts,
+ tuple_size(addr) == 4,
+ addr != {127,0,0,1}
+ do
+ addr
+ end
+ end
+ def get_pub_inet4 do
+ Application.ensure_all_started(:inets)
+ Application.ensure_all_started(:ssl)
+ {:ok, {_, _, body}} = :httpc.request('http://api.ipify.org')
+ {:ok, addr} = :inet.parse_address body
+ addr
+ end
+ def get_all_inet4 do
+ addrset = for x <- get_if_inet4(), into: %MapSet{}, do: x
+ addrset = MapSet.put(addrset, get_pub_inet4())
+ MapSet.to_list addrset
+ end
diff --git a/shard/lib/net/group.ex b/shard/lib/net/group.ex
new file mode 100644
index 0000000..d2c2537
--- /dev/null
+++ b/shard/lib/net/group.ex
@@ -0,0 +1,98 @@
+defprotocol SNet.Group do
+ @moduledoc"""
+ A group is a specification of a bunch of peers we want and accept to talk to
+ about some things. It supports a number of abstract operations for finding peers,
+ broadcasting/gossiping, authenticating, etc.
+ """
+ @doc"""
+ Find new peers for this group, open connections and notify us when connections are open.
+ Launches background processes if necessary, returns immediately.
+ """
+ def init_lookup(group, notify_to)
+ @doc"""
+ Get all currently open connections to peers in this group.
+ """
+ def get_connections(group)
+ @doc"""
+ Broadcast a message to peers of the group.
+ Will send to at most nmax peers, so this is a good primitive for gossip.
+ """
+ def broadcast(group, msg, nmax \\ 10)
+ @doc"""
+ Check if a peer is allowed to participate in this group.
+ """
+ def in_group?(group, conn_pid, auth)
+defmodule SNet.PubShardGroup do
+ defstruct [:id]
+ defimpl SNet.Group do
+ def init_lookup(%SNet.PubShardGroup{id: id}, _notify_to) do
+ # For now: ask all currently connected peers and connect to new peers we know of
+ spawn fn ->
+ for {_, pid, _} <- Shard.Manager.list_connections do
+ GenServer.cast(pid, {:send_msg, {:interested, [id]}})
+ end
+ for peer_info <- Shard.Manager.get_shard_peers id do
+ if Shard.Manager.get_connections_to peer_info == [] do
+ Shard.Manager.add_peer(peer_info) # TODO callback when connected
+ end
+ end
+ end
+ # TODO: use a DHT to find peers
+ end
+ def get_connections(%SNet.PubShardGroup{id: id}) do
+ for peer_info <- Shard.Manager.get_shard_peers(id),
+ [{pid, _auth}|_] = Shard.Manager.get_connections_to(peer_info),
+ do: pid
+ end
+ def broadcast(group, msg, nmax) do
+ %SNet.PubShardGroup{id: id} = group
+ nsent = get_connections(group)
+ |> Enum.shuffle
+ |> Enum.take(nmax)
+ |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg})))
+ |> Enum.count
+ if nmax - nsent > 0 do
+ Shard.Manager.get_shard_peers(id)
+ |> Enum.filter(&(Shard.Manager.get_connections_to(&1) == []))
+ |> Enum.shuffle
+ |> Enum.take(nmax - nsent)
+ |> Enum.map(&(Shard.Manager.send(&1, msg)))
+ end
+ end
+ def in_group?(%SNet.PubShardGroup{id: _id}, _peer_pid, _auth) do
+ true # No access control
+ end
+ end
+defmodule SNet.PrivGroup do
+ defstruct [:pk_list]
+ defimpl SNet.Group do
+ def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do
+ # TODO
+ end
+ def get_connections(%SNet.PubShardGroup{id: id}) do
+ # TODO
+ end
+ def broadcast(group, msg, nmax) do
+ end
+ def in_group?(%SNet.PubShardGroup{id: _id}, peer_pid, auth) do
+ # TODO
+ end
+ end