diff options
Diffstat (limited to 'lib/app/chat.ex')
-rw-r--r-- | lib/app/chat.ex | 86 |
1 files changed, 55 insertions, 31 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex index e28e896..051fab6 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -19,7 +19,8 @@ defmodule SApp.Chat do use GenServer - alias SData.MerkleList, as: ML + require Logger + alias SData.MerkleSearchTree, as: MST @doc """ Start a process that connects to a given channel @@ -32,19 +33,22 @@ defmodule SApp.Chat do Initialize channel process. """ def init(channel) do - store = ML.new(&msg_cmp/2) manifest = {:chat, channel} id = SData.term_hash manifest case Shard.Manager.register(id, manifest, self()) do :ok -> Shard.Manager.dispatch_to(id, nil, self()) + {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store) + mst = %MST{store: %SApp.BlockStore{pid: block_store}, + cmp: &msg_cmp/2} GenServer.cast(self(), :init_pull) {:ok, %{channel: channel, id: id, manifest: manifest, - store: store, + block_store: block_store, + mst: mst, subs: MapSet.new, } } @@ -60,8 +64,8 @@ defmodule SApp.Chat do {:reply, state.manifest, state} end - def handle_call({:read_history, start, num}, _from, state) do - ret = ML.read(state.store, start, num) + def handle_call({:read_history, top_bound, num}, _from, state) do + ret = MST.last(state.mst, top_bound, num) {:reply, ret, state} end @@ -86,7 +90,9 @@ defmodule SApp.Chat do msgitem = {(System.os_time :seconds), Shard.Identity.get_nickname(), msg} - new_state = %{state | store: ML.insert(state.store, msgitem)} + prev_root = state.mst.root + mst = MST.insert(state.mst, msgitem) + state = %{state | mst: mst} for pid <- state.subs do if Process.alive?(pid) do @@ -94,11 +100,12 @@ defmodule SApp.Chat do end end + notif = {:append, prev_root, msgitem, mst.root} for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do - push_messages(new_state, peer_id, nil, 5) + Shard.Manager.send(peer_id, {state.id, nil, notif}) end - {:noreply, new_state} + {:noreply, state} end @doc """ @@ -106,7 +113,7 @@ defmodule SApp.Chat do connected to asks to recieve data for this channel. """ def handle_cast({:interested, peer_id}, state) do - push_messages(state, peer_id, nil, 10) + Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}}) {:noreply, state} end @@ -126,27 +133,52 @@ defmodule SApp.Chat do Merkle hash of the store of older messages. """ def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do - case msg do + state = case msg do {:get_manifest} -> Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}}) - {:get, start} -> push_messages(state, peer_id, start, 20) - {:info, _start, list, rest} -> - if rest != nil and not ML.has(state.store, rest) do - Shard.Manager.send(peer_id, {state.id, nil, {:get, rest}}) + state + {:append, prev_root, msgitem, new_root} -> + # Append message: one single mesage has arrived + if new_root == state.mst.root do + # We already have the message, do nothing + state + else + # Try adding the message + if prev_root == state.mst.root do + mst2 = MST.insert(state.mst, msgitem) + if mst2.root == new_root do + # This was the only message missing, we are happy! + state = %{state | mst: mst2} + msg_callback(state, msgitem) + state + else + # More messages are missing, start a full merge + init_merge(state, new_root, peer_id) + end + else + init_merge(state, new_root, peer_id) + end + end + {:root, new_root} -> + if new_root == state.mst.root do + # already up to date, ignore + state + else + init_merge(state, new_root, peer_id) end - who = self() - spawn_link(fn -> - Process.sleep 1000 - GenServer.cast(who, {:deferred_insert, list}) - end) - _ -> nil + x -> + Logger.info("Unhandled message: #{inspect x}") + state end {:noreply, state} end - def handle_cast({:deferred_insert, list}, state) do - new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state, msg) end)) - {:noreply, %{state | store: new_store}} + defp init_merge(state, new_root, source_peer) do + # TODO: make the merge asynchronous + mgmst = %{state.mst | root: new_root} + mgmst = put_in(mgmst.store.prefer_ask, [source_peer]) + mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end) + %{state | mst: mst} end def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do @@ -154,14 +186,6 @@ defmodule SApp.Chat do {:noreply, %{ state | subs: new_subs }} end - defp push_messages(state, to, start, num) do - case ML.read(state.store, start, num) do - {:ok, list, rest} -> - Shard.Manager.send(to, {state.id, nil, {:info, start, list, rest}}) - _ -> nil - end - end - defp msg_callback(state, {ts, nick, msg}) do for pid <- state.subs do if Process.alive?(pid) do |