aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/app
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-10-11 14:22:00 +0200
committerAlex Auvolat <alex@adnab.me>2018-10-11 14:22:00 +0200
commiteb8c949551ffb8b3600357d7ff2bebe750af96e5 (patch)
tree36a0b46219ae595313b569c5885c470a0ddaca0e /shard/lib/app
parent7b6042205e7c6135fae4e0d21dbf7a5975e8491b (diff)
downloadshard-eb8c949551ffb8b3600357d7ff2bebe750af96e5.tar.gz
shard-eb8c949551ffb8b3600357d7ff2bebe750af96e5.zip
Address, broadcast group management
Diffstat (limited to 'shard/lib/app')
-rw-r--r--shard/lib/app/chat.ex123
-rw-r--r--shard/lib/app/identity.ex19
2 files changed, 67 insertions, 75 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index 0f4d573..051dfef 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -5,7 +5,7 @@ defmodule SApp.Chat do
Chat rooms are globally identified by their channel name.
A chat room manifest is of the form:
- {:chat, channel_name}
+ %SApp.Chat.Manifest{channel: channel_name}
Future improvements:
- message signing
@@ -28,7 +28,7 @@ defmodule SApp.Chat do
defimpl Shard.Manifest, for: Manifest do
def start(m) do
- DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m.channel})
+ DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m})
end
end
@@ -36,15 +36,15 @@ defmodule SApp.Chat do
@doc """
Start a process that connects to a given channel
"""
- def start_link(channel) do
- GenServer.start_link(__MODULE__, channel)
+ def start_link(manifest) do
+ GenServer.start_link(__MODULE__, manifest)
end
@doc """
Initialize channel process.
"""
- def init(channel) do
- manifest = %Manifest{channel: channel}
+ def init(manifest) do
+ %Manifest{channel: channel} = manifest
id = SData.term_hash manifest
case Shard.Manager.register(id, manifest, self()) do
@@ -62,10 +62,12 @@ defmodule SApp.Chat do
mst = %MST{store: %SApp.PageStore{pid: page_store},
cmp: &msg_cmp/2,
root: root}
- GenServer.cast(self(), :init_pull)
+ group = %SNet.PubShardGroup{id: id}
+ SNet.Group.init_lookup(group, self())
{:ok,
%{channel: channel,
id: id,
+ group: group,
manifest: manifest,
page_store: page_store,
mst: mst,
@@ -96,18 +98,6 @@ defmodule SApp.Chat do
end
@doc """
- Implementation of the :init_pull handler, which is called when the
- process starts. It contacts all currently connected peers and asks them to
- send data for this channel if they have some.
- """
- def handle_cast(:init_pull, state) do
- for {_, pid, _} <- Shard.Manager.list_connections do
- GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
- end
- {:noreply, state}
- end
-
- @doc """
Implementation of the :chat_send handler. This is the main handler that is used
to send a message to the chat room. Puts the message in the store and syncs
with all connected peers.
@@ -129,9 +119,7 @@ defmodule SApp.Chat do
end
notif = {:append, prev_root, msgitem, mst.root}
- for peer_info <- Shard.Manager.get_shard_peers(state.id) do
- Shard.Manager.send(peer_info, {state.id, nil, notif})
- end
+ SNet.Group.broadcast(state.group, notif)
{:noreply, state}
end
@@ -140,8 +128,10 @@ defmodule SApp.Chat do
Implementation of the :interested handler, this is called when a peer we are
connected to asks to recieve data for this channel.
"""
- def handle_cast({:interested, conn_pid, _auth}, state) do
- Shard.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}})
+ def handle_cast({:interested, conn_pid, auth}, state) do
+ if SNet.Group.in_group?(state.group, conn_pid, auth) do
+ Shard.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root}})
+ end
{:noreply, state}
end
@@ -160,54 +150,59 @@ defmodule SApp.Chat do
- `{:info, start, list, rest}`: put some messages and informs of the
Merkle hash of the store of older messages.
"""
- def handle_cast({:msg, conn_pid, _auth, _shard_id, nil, msg}, state) do
- state = case msg do
- {:get_manifest} ->
- Shard.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}})
- state
- {:append, prev_root, msgitem, new_root} ->
- # Append message: one single mesage has arrived
- if new_root == state.mst.root do
- # We already have the message, do nothing
+ def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
+ if not SNet.Group.in_group?(state.group, conn_pid, auth) do
+ # Ignore message
+ {:noreply, state}
+ else
+ state = case msg do
+ {:get_manifest} ->
+ Shard.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}})
state
- else
- # Try adding the message
- {pk, bin, sign} = msgitem
- if Shard.Keys.verify(pk, bin, sign) == :ok do
- if prev_root == state.mst.root do
- # Only one new message, insert it directly
- mst2 = MST.insert(state.mst, msgitem)
- if mst2.root == new_root do
- state = %{state | mst: mst2}
- GenServer.cast(state.page_store, {:set_roots, [mst2.root]})
- Shard.Manager.save_state(state.id, mst2.root)
- msg_callback(state, msgitem)
- state
+ {:append, prev_root, msgitem, new_root} ->
+ # Append message: one single mesage has arrived
+ if new_root == state.mst.root do
+ # We already have the message, do nothing
+ state
+ else
+ # Try adding the message
+ {pk, bin, sign} = msgitem
+ if Shard.Keys.verify(pk, bin, sign) == :ok do
+ if prev_root == state.mst.root do
+ # Only one new message, insert it directly
+ mst2 = MST.insert(state.mst, msgitem)
+ if mst2.root == new_root do
+ state = %{state | mst: mst2}
+ GenServer.cast(state.page_store, {:set_roots, [mst2.root]})
+ Shard.Manager.save_state(state.id, mst2.root)
+ msg_callback(state, msgitem)
+ state
+ else
+ Logger.warn("Invalid new root after inserting same message item!")
+ state
+ end
else
- Logger.warn("Invalid new root after inserting same message item!")
- state
+ # Not a simple one-insertion transition, look at the whole tree
+ init_merge(state, new_root, conn_pid)
end
else
- # Not a simple one-insertion transition, look at the whole tree
- init_merge(state, new_root, conn_pid)
+ Logger.warn("Received message with invalid signature")
+ state
end
- else
- Logger.warn("Received message with invalid signature")
+ end
+ {:root, new_root} ->
+ if new_root == state.mst.root do
+ # already up to date, ignore
state
+ else
+ init_merge(state, new_root, conn_pid)
end
- end
- {:root, new_root} ->
- if new_root == state.mst.root do
- # already up to date, ignore
+ x ->
+ Logger.info("Unhandled message: #{inspect x}")
state
- else
- init_merge(state, new_root, conn_pid)
- end
- x ->
- Logger.info("Unhandled message: #{inspect x}")
- state
+ end
+ {:noreply, state}
end
- {:noreply, state}
end
defp init_merge(state, new_root, source_peer_pid) do
@@ -260,7 +255,7 @@ defmodule SApp.Chat do
end
end
- defp msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do
+ def msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do
{ts1, msg1} = SData.term_unbin msgbin1
{ts2, msg2} = SData.term_unbin msgbin2
cond do
diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex
index 6909ad3..d2748a1 100644
--- a/shard/lib/app/identity.ex
+++ b/shard/lib/app/identity.ex
@@ -50,20 +50,17 @@ defmodule SApp.Identity do
def find_proc(pk) do
manifest = %Manifest{pk: pk}
id = SData.term_hash manifest
- Shard.Manager.find_proc id
+ case Shard.Manager.find_proc id do
+ nil ->
+ Shard.Manifest.start manifest
+ pid -> pid
+ end
end
def get_nick(pk) do
- case find_proc pk do
- nil ->
- if Shard.Keys.valid_identity_pk? pk do
- Shard.Manifest.start %Manifest{pk: pk}
- end
- default_nick pk
- pid ->
- info = GenServer.call(pid, :get_info)
- info.nick
- end
+ pid = find_proc pk
+ info = GenServer.call(pid, :get_info)
+ info.nick
end
def handle_call(:manifest, _from, state) do