aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/app/chat.ex
blob: 8d55cda109333d9002cf97a9ac497bb9fa280bf6 (plain) (tree)
1
2
3
4
5
6
7
8
9
                      





                                                                 
                                                
 



                                                                           


                                            


                                                               
                                                               

     

               

                                       
 
 



             





                                          
                                                                            



       



                                
                                                             












                                                                            


                                                  

                                              

     


                             
                       
                                 
 

                                                        





                                                 
                                                  
                                                                                



                                                          






                                                                                                  
                                                           
                                   
                              
                                                
             

                               
                               
                                   
                     
                             
                       


                   
                        
       

     


                                                                            



                                             

                                                                   


                        



















                                               
          



                                                                                 




                                                            


                                        
                     
 

                               
                                                        


         
                                                                    



                                               
 
                                                        
                                                                    
                     

     



                                                                              
                                                          
                                                              
                                                                                     
       
                     

     
                                              
                        



                                           








                                                                            
                                                                        
                                                                  




                          
                                                                                       
               














                                                                             
                                   





                                                                                    
                  

                                                                               
                 
                

                                                                    
               
             

                                                  
                                        
                 

                                                 
             



                                                                                            

                                                        
               

                       
       

     
                                                      





                                                                                                           
 
                                                                                         
 
                                           
                                                               
                                       
 






                                                     
         
 
                             


                                
                                                                  


                                   



                                                                                         
       

     




                                                                  



                                                                                 
                                                 

                               
                                                                   

         

     
                                                                

                                          


                          

                          



                            

      
defmodule SApp.Chat do
  @moduledoc """
  Shard application for a replicated chat room with full history.

  Chat rooms are globally identified by their channel name.
  A chat room manifest is of the form:

      %SApp.Chat.Manifest{channel: channel_name}

  A private chat room manifest is of the form:

      %SApp.Chat.PrivChat.Manifest{pk_list: ordered_list_of_authorized_pks}

  Future improvements:
  - message signing
  - storage of the chatroom messages to disk
  - use a DHT to find peers that are interested in this channel
  - epidemic broadcast (carefull not to be too costly,
    maybe by limiting the number of peers we talk to)
  - partial synchronization only == data distributed over peers
  """

  use GenServer

  require Logger
  alias SData.MerkleSearchTree, as: MST


  # =========
  # MANIFESTS
  # =========

  defmodule Manifest do
    defstruct [:channel]
  end

  defimpl Shard.Manifest, for: Manifest do
    def start(m) do
      DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m})
    end
  end


  defmodule PrivChat.Manifest do
    defstruct [:pk_list]

    def new(pk_list) do
      %__MODULE__{pk_list: pk_list |> Enum.sort |> Enum.uniq}
    end
  end

  defimpl Shard.Manifest, for: PrivChat.Manifest do
    def start(m) do
      DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, m})
    end
  end

  # ==========
  # MAIN LOGIC
  # ==========

  @doc """
  Start a process that connects to a given channel
  """
  def start_link(manifest) do
    GenServer.start_link(__MODULE__, manifest)
  end

  @doc """
  Initialize channel process.
  """
  def init(manifest) do
    id = SData.term_hash manifest

    case Shard.Manager.register(id, manifest, self()) do
      :ok ->
        netgroup = case manifest do
          %Manifest{channel: _channel} ->
            %SNet.PubShardGroup{id: id}
          %PrivChat.Manifest{pk_list: pk_list} ->
            %SNet.PrivGroup{pk_list: pk_list}
        end
        Shard.Manager.dispatch_to(id, nil, self())
        {:ok, page_store} = SApp.PageStore.start_link(id, :page_store, netgroup)
        {root, read} = case Shard.Manager.load_state id do
          %{root: root, read: read} -> {root, read}
          _ -> {nil, nil}
        end
        root = cond do
          root == nil -> nil
          GenServer.call(page_store, {:have_rec, root}) -> root
          true ->
            Logger.warn "Not all pages for saved root were saved, restarting from an empty state!"
            nil
        end
        mst = %MST{store: %SApp.PageStore{pid: page_store},
                   cmp: &msg_cmp/2,
                   root: root}
        SNet.Group.init_lookup(netgroup, self())
        {:ok,
          %{id: id,
            netgroup: netgroup,
            manifest: manifest,
            page_store: page_store,
            mst: mst,
            subs: MapSet.new,
            read: read,
          }
        }
      :redundant ->
        exit(:redundant)
    end
  end

  @doc """
  Implementation of the :manifest call that returns the chat room's manifest
  """
  def handle_call(:manifest, _from, state) do
    {:reply, state.manifest, state}
  end

  def handle_call({:read_history, top_bound, num}, _from, state) do
    ret = MST.last(state.mst, top_bound, num)
    {:reply, ret, state}
  end

  def handle_call(:has_unread, _from, state) do
    if state.mst.root != state.read do
      case MST.last(state.mst, nil, 1) do
        [{{_, msgbin, _}, true}] ->
          {ts, _} = SData.term_unbin msgbin
          {:reply, ts, state}
        [] ->
          {:reply, nil, state}
      end
    else
      {:reply, nil, state}
    end
  end

  def handle_cast(:mark_read, state) do
    state = %{state | read: state.mst.root}
    save_state(state)
    {:noreply, state}
  end

  @doc """
  Implementation of the :chat_send handler. This is the main handler that is used
  to send a message to the chat room. Puts the message in the store and syncs
  with all connected peers.
  """
  def handle_cast({:chat_send, pk, msg}, state) do
    msgbin = SData.term_bin {(System.os_time :seconds), msg}
    {:ok, sign} = Shard.Keys.sign_detached(pk, msgbin)
    msgitem = {pk, msgbin, sign}

    prev_root = state.mst.root
    mst = MST.insert(state.mst, msgitem)
    state = %{state | mst: mst}
    save_state(state)

    for pid <- state.subs do
      if Process.alive?(pid) do
        send(pid, {:chat_send, state.manifest, msgitem})
      end
    end

    notif = {state.id, nil, {:append, prev_root, msgitem, mst.root}}
    SNet.Group.broadcast(state.netgroup, notif)

    {:noreply, state}
  end

  def handle_cast({:peer_connected, conn_pid}, state) do
    GenServer.cast(conn_pid, {:send_msg, {:interested, [state.id]}})
    {:noreply, state}
  end

  @doc """
  Implementation of the :interested handler, this is called when a peer we are
  connected to asks to recieve data for this channel.
  """
  def handle_cast({:interested, conn_pid, auth}, state) do
    if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
      SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, true}})
    end
    {:noreply, state}
  end

  def handle_cast({:subscribe, pid}, state) do
    Process.monitor(pid)
    new_subs = MapSet.put(state.subs, pid)
    {:noreply, %{ state | subs: new_subs }}
  end

  @doc """
  Implementation of the :msg handler, which is the main handler for messages
  comming from other peers concerning this chat room.

  Messages are:
  - `{:get, start}`: get some messages starting at a given Merkle hash
  - `{:info, start, list, rest}`: put some messages and informs of the
    Merkle hash of the store of older messages.
  """
  def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
    if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
      # Ignore message
      {:noreply, state}
    else
      state = case msg do
        {:get_manifest} ->
          SNet.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}})
          state
        {:append, prev_root, msgitem, new_root} ->
          # Append message: one single mesage has arrived
          if new_root == state.mst.root do
            # We already have the message, do nothing
            state
          else
            # Try adding the message
            {pk, bin, sign} = msgitem
            if Shard.Keys.verify(pk, bin, sign) == :ok do
              if prev_root == state.mst.root do
                # Only one new message, insert it directly
                mst2 = MST.insert(state.mst, msgitem)
                if mst2.root == new_root do
                  state = %{state | mst: mst2}
                  GenServer.cast(state.page_store, {:set_roots, [mst2.root]})
                  save_state(state)
                  msg_callback(state, msgitem)
                  state
                else
                  Logger.warn("Invalid new root after inserting same message item!")
                  state
                end
              else
                # Not a simple one-insertion transition, look at the whole tree
                init_merge(state, new_root, conn_pid)
              end
            else
              Logger.warn("Received message with invalid signature")
              state
            end
          end
        {:root, new_root, ask_reply} ->
          state = if new_root == state.mst.root do
            # already up to date, ignore
            state
          else
            init_merge(state, new_root, conn_pid)
          end
          if ask_reply do
            SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, false}})
          end
          state
        x ->
          Logger.info("Unhandled message: #{inspect x}")
          state
      end
      {:noreply, state}
    end
  end

  defp init_merge(state, new_root, source_peer_pid) do
    if new_root == nil do
      state
    else
      # TODO: make the merge asynchronous
      
      Logger.info("Starting merge for #{inspect state.manifest}, merging root: #{new_root|>Base.encode16}")

      prev_last = for {x, true} <- MST.last(state.mst, nil, 100), into: MapSet.new, do: x

      mgmst = %{state.mst | root: new_root}
      mgmst = put_in(mgmst.store.prefer_ask, [source_peer_pid])
      mst = MST.merge(state.mst, mgmst)

      new = for {x, true} <- MST.last(mst, nil, 100),
              not MapSet.member?(prev_last, x)
            do x end

      correct = for x <- new do
        {pk, bin, sign} = x
        Shard.Keys.verify(pk, bin, sign)
      end

      if Enum.all? correct do
        for x <- new do
          msg_callback(state, x)
        end
        GenServer.cast(state.page_store, {:set_roots, [mst.root]})
        state = %{state | mst: mst}
        save_state(state)
        state
      else
        Logger.warn("Incorrect signatures somewhere while merging, dropping merged data")
        state
      end
    end
  end

  def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
    new_subs = MapSet.delete(state.subs, pid)
    {:noreply, %{ state | subs: new_subs }}
  end

  defp save_state(state) do
    Shard.Manager.save_state(state.id, %{root: state.mst.root, read: state.read})
  end

  defp msg_callback(state, {pk, msgbin, sign}) do
    for pid <- state.subs do
      if Process.alive?(pid) do
        send(pid, {:chat_recv, state.manifest, {pk, msgbin, sign}})
      end
    end
  end

  def msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do
    {ts1, msg1} = SData.term_unbin msgbin1
    {ts2, msg2} = SData.term_unbin msgbin2
    cond do
      ts1 > ts2 -> :after
      ts1 < ts2 -> :before
      pk1 > pk2 -> :after
      pk1 < pk2 -> :before
      msg1 > msg2 -> :after
      msg1 < msg2 -> :before
      true -> :duplicate
    end
  end 
end