aboutsummaryrefslogtreecommitdiff
path: root/lib/app/chat.ex
diff options
context:
space:
mode:
Diffstat (limited to 'lib/app/chat.ex')
-rw-r--r--lib/app/chat.ex86
1 files changed, 55 insertions, 31 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
index e28e896..051fab6 100644
--- a/lib/app/chat.ex
+++ b/lib/app/chat.ex
@@ -19,7 +19,8 @@ defmodule SApp.Chat do
use GenServer
- alias SData.MerkleList, as: ML
+ require Logger
+ alias SData.MerkleSearchTree, as: MST
@doc """
Start a process that connects to a given channel
@@ -32,19 +33,22 @@ defmodule SApp.Chat do
Initialize channel process.
"""
def init(channel) do
- store = ML.new(&msg_cmp/2)
manifest = {:chat, channel}
id = SData.term_hash manifest
case Shard.Manager.register(id, manifest, self()) do
:ok ->
Shard.Manager.dispatch_to(id, nil, self())
+ {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store)
+ mst = %MST{store: %SApp.BlockStore{pid: block_store},
+ cmp: &msg_cmp/2}
GenServer.cast(self(), :init_pull)
{:ok,
%{channel: channel,
id: id,
manifest: manifest,
- store: store,
+ block_store: block_store,
+ mst: mst,
subs: MapSet.new,
}
}
@@ -60,8 +64,8 @@ defmodule SApp.Chat do
{:reply, state.manifest, state}
end
- def handle_call({:read_history, start, num}, _from, state) do
- ret = ML.read(state.store, start, num)
+ def handle_call({:read_history, top_bound, num}, _from, state) do
+ ret = MST.last(state.mst, top_bound, num)
{:reply, ret, state}
end
@@ -86,7 +90,9 @@ defmodule SApp.Chat do
msgitem = {(System.os_time :seconds),
Shard.Identity.get_nickname(),
msg}
- new_state = %{state | store: ML.insert(state.store, msgitem)}
+ prev_root = state.mst.root
+ mst = MST.insert(state.mst, msgitem)
+ state = %{state | mst: mst}
for pid <- state.subs do
if Process.alive?(pid) do
@@ -94,11 +100,12 @@ defmodule SApp.Chat do
end
end
+ notif = {:append, prev_root, msgitem, mst.root}
for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do
- push_messages(new_state, peer_id, nil, 5)
+ Shard.Manager.send(peer_id, {state.id, nil, notif})
end
- {:noreply, new_state}
+ {:noreply, state}
end
@doc """
@@ -106,7 +113,7 @@ defmodule SApp.Chat do
connected to asks to recieve data for this channel.
"""
def handle_cast({:interested, peer_id}, state) do
- push_messages(state, peer_id, nil, 10)
+ Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}})
{:noreply, state}
end
@@ -126,27 +133,52 @@ defmodule SApp.Chat do
Merkle hash of the store of older messages.
"""
def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do
- case msg do
+ state = case msg do
{:get_manifest} ->
Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}})
- {:get, start} -> push_messages(state, peer_id, start, 20)
- {:info, _start, list, rest} ->
- if rest != nil and not ML.has(state.store, rest) do
- Shard.Manager.send(peer_id, {state.id, nil, {:get, rest}})
+ state
+ {:append, prev_root, msgitem, new_root} ->
+ # Append message: one single mesage has arrived
+ if new_root == state.mst.root do
+ # We already have the message, do nothing
+ state
+ else
+ # Try adding the message
+ if prev_root == state.mst.root do
+ mst2 = MST.insert(state.mst, msgitem)
+ if mst2.root == new_root do
+ # This was the only message missing, we are happy!
+ state = %{state | mst: mst2}
+ msg_callback(state, msgitem)
+ state
+ else
+ # More messages are missing, start a full merge
+ init_merge(state, new_root, peer_id)
+ end
+ else
+ init_merge(state, new_root, peer_id)
+ end
+ end
+ {:root, new_root} ->
+ if new_root == state.mst.root do
+ # already up to date, ignore
+ state
+ else
+ init_merge(state, new_root, peer_id)
end
- who = self()
- spawn_link(fn ->
- Process.sleep 1000
- GenServer.cast(who, {:deferred_insert, list})
- end)
- _ -> nil
+ x ->
+ Logger.info("Unhandled message: #{inspect x}")
+ state
end
{:noreply, state}
end
- def handle_cast({:deferred_insert, list}, state) do
- new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state, msg) end))
- {:noreply, %{state | store: new_store}}
+ defp init_merge(state, new_root, source_peer) do
+ # TODO: make the merge asynchronous
+ mgmst = %{state.mst | root: new_root}
+ mgmst = put_in(mgmst.store.prefer_ask, [source_peer])
+ mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end)
+ %{state | mst: mst}
end
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
@@ -154,14 +186,6 @@ defmodule SApp.Chat do
{:noreply, %{ state | subs: new_subs }}
end
- defp push_messages(state, to, start, num) do
- case ML.read(state.store, start, num) do
- {:ok, list, rest} ->
- Shard.Manager.send(to, {state.id, nil, {:info, start, list, rest}})
- _ -> nil
- end
- end
-
defp msg_callback(state, {ts, nick, msg}) do
for pid <- state.subs do
if Process.alive?(pid) do