aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/app/chat.ex
blob: 8a72d48da2124774247787f27c4375a4739f99e7 (plain) (tree)
1
2
3
4
5
6
7
8
9
                      





                                                                 
                                                
 



                                                                           
                      
                                                               
                                                               

     

               

                                       
 
 



             
                       
                 


                                                                 

       
                        
 

                                   
                                 


       
                                





                                                                                            

                        


                                                                              
                       
                                                             
       
 

                                   



                                                            






              
                    



                                        


                                                                          
          


                                                                                          
     

                                              

     
                       
                                 
 




                                             
       







                                                                            
                                                        








                                                                                              
                    







                               





                                             
                                                 
                                            


                                

                                                                   


                        













                                               




                                                            





                                           
                                                  






                                                 


                                                      


                                        
                     
 

                               
                                                        


         
                                                                    



                                               
 
                                                        
                                                                    
                     

     
                                                          
                                                              
                                                                                     
       
                     

     
                                              
                        



                                           
                                                                        
                                                                  



                         













                                                          
                                                                         
                                   
                                              
                                                                                                     




                                                                                    
                  

                                                                               
                 
                

                                                                    
               
             

                                                  
                                        
                 

                                                 
             



                                                                                            

                                                        
               

                       
       

     
                                                      

                             





                                                                                                           
 
                                                                                         
 
                                           
                                                               
                                       
 






                                                     
         
 
                             


                                
                                                              

                                   


                                                                                                                               
             



                                                                                         
       

     




                                                                  



                                                                                 
                                                 

                               
                                                                   

         

     
                                                                 

                                          


                          

                          



                            
      









                                                                              
                                                


    
                                                


                                          
                                                    










































                                                                                    
   
defmodule SApp.Chat do
  @moduledoc """
  Shard application for a replicated chat room with full history.

  Chat rooms are globally identified by their channel name.
  A chat room manifest is of the form:

      %SApp.Chat.Manifest{channel: channel_name}

  A private chat room manifest is of the form:

      %SApp.Chat.PrivChat.Manifest{pk_list: ordered_list_of_authorized_pks}

  Future improvements:
  - use a DHT to find peers that are interested in this channel
  - partial synchronization only == data distributed over peers
  """

  use GenServer

  require Logger
  alias SData.MerkleSearchTree, as: MST


  # =========
  # MANIFESTS
  # =========

  defmodule Manifest do
    @moduledoc"""
    Manifest for a public chat room defined by its name. Example:

        %SApp.Chat.Manifest{channel: "test"}
    """

    defstruct [:channel]

    defimpl Shard.Manifest do
      def module(_m), do: SApp.Chat
      def is_valid?(_m), do: true
    end
  end

  defmodule PrivChat.Manifest do
    @moduledoc"""
    Manifest for a private chat room defined by the list of participants.

    Do not instanciate this struct directly, use `new` to ensure a canonical representation.
    """

    defstruct [:pk_list]

    @doc"""
    Ensures a canonical representation by sorting pks and removing duplicates.
    """
    def new(pk_list) do
      %__MODULE__{pk_list: pk_list |> Enum.sort |> Enum.uniq}
    end

    defimpl Shard.Manifest do
      def module(_m), do: SApp.Chat
      def is_valid?(m) do
        Enum.all?(m.pk_list, &(byte_size(&1)==32))
        and m.pk_list == m.pk_list |> Enum.sort |> Enum.uniq
      end
    end
  end

  # ==========
  # MAIN LOGIC
  # ==========

  defmodule State do
    @moduledoc"""
    Internal state struct of chat shard.
    """

    defstruct [:id, :netgroup, :manifest, :page_store, :mst, :subs, :read]
  end
  
  @doc """
  Start a process that connects to a given channel. Don't call directly, use for instance:

      Shard.Manager.find_or_start %SApp.Chat.Manifest{channel: "my_chan"}
  """
  def start_link(manifest) do
    GenServer.start_link(__MODULE__, manifest)
  end

  def init(manifest) do
    id = SData.term_hash manifest

    netgroup = case manifest do
      %Manifest{channel: _channel} ->
        %SNet.PubShardGroup{id: id}
      %PrivChat.Manifest{pk_list: pk_list} ->
        %SNet.PrivGroup{pk_list: pk_list}
    end
    Shard.Manager.dispatch_to(id, nil, self())
    {:ok, page_store} = SApp.PageStore.start_link(id, :page_store, netgroup)
    {root, read} = case Shard.Manager.load_state id do
      %{root: root, read: read} -> {root, read}
      _ -> {nil, nil}
    end
    root = cond do
      root == nil -> nil
      SApp.PageStore.have_rec?(page_store, root) -> root
      true ->
        Logger.warn "Not all pages for saved root were saved, restarting from an empty state!"
        nil
    end
    mst = %MST{store: %SApp.PageStore{pid: page_store},
               cmp: &msg_cmp/2,
               root: root}
    SNet.Group.init_lookup(netgroup, self())
    {:ok,
      %State{id: id,
        netgroup: netgroup,
        manifest: manifest,
        page_store: page_store,
        mst: mst,
        subs: MapSet.new,
        read: read,
      }
    }
  end

  def handle_call(:manifest, _from, state) do
    {:reply, state.manifest, state}
  end

  def handle_call(:delete_shard, _from, state) do
    SApp.PageStore.delete_store(state.store)
    {:stop, :normal, :ok, state}
  end

  def handle_call({:read_history, top_bound, num}, _from, state) do
    ret = MST.last(state.mst, top_bound, num)
    {:reply, ret, state}
  end

  def handle_call(:has_unread, _from, state) do
    if state.mst.root != state.read do
      case MST.last(state.mst, nil, 1) do
        [{{_, msgbin, _}, true}] ->
          {ts, _} = SData.term_unbin msgbin
          {:reply, ts, state}
        [] ->
          {:reply, nil, state}
      end
    else
      {:reply, nil, state}
    end
  end

  def handle_cast(:send_deps, state) do
    GenServer.cast(Shard.Manager, {:dep_list, state.id, []})
    {:noreply, state}
  end

  def handle_cast(:mark_read, state) do
    state = %{state | read: state.mst.root}
    save_state(state)
    {:noreply, state}
  end

  def handle_cast({:chat_send, pk, msg}, state) do
    next_ts = case MST.last(state.mst, nil, 1) do
      [] -> System.os_time :seconds
      [{{_, msgbin, _}, true}] ->
        {ts, _msg} = SData.term_unbin msgbin
        max(ts + 1, System.os_time :seconds)
    end
    msgbin = SData.term_bin {next_ts, msg}
    {:ok, sign} = Shard.Keys.sign_detached(pk, msgbin)
    msgitem = {pk, msgbin, sign}

    prev_root = state.mst.root
    mst = MST.insert(state.mst, msgitem)
    state = %{state | mst: mst}
    save_state(state)

    for pid <- state.subs do
      if Process.alive?(pid) do
        send(pid, {:chat_send, state.manifest, msgitem})
      end
    end

    notif = {state.id, nil, {:append, prev_root, msgitem, mst.root}}
    SNet.Group.broadcast(state.netgroup, notif)

    {:noreply, state}
  end

  def handle_cast({:peer_connected, conn_pid}, state) do
    GenServer.cast(conn_pid, {:send_msg, {:interested, [state.id]}})
    {:noreply, state}
  end

  def handle_cast({:interested, conn_pid, auth}, state) do
    if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
      SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, true}})
    end
    {:noreply, state}
  end

  def handle_cast({:subscribe, pid}, state) do
    Process.monitor(pid)
    new_subs = MapSet.put(state.subs, pid)
    {:noreply, %{ state | subs: new_subs }}
  end

  def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
    if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
      # Ignore message
      {:noreply, state}
    else
      state = case msg do
        {:append, prev_root, msgitem, new_root} ->
          # Append message: one single mesage has arrived
          if new_root == state.mst.root do
            # We already have the message, do nothing
            state
          else
            # Try adding the message
            {pk, bin, sign} = msgitem
            if Shard.Keys.verify(pk, bin, sign) == :ok do
              if prev_root == state.mst.root do
                # Only one new message, insert it directly
                mst2 = MST.insert(state.mst, msgitem)
                if mst2.root == new_root do
                  state = %{state | mst: mst2}
                  SApp.PageStore.set_roots(state.page_store, [mst2.root])
                  save_state(state)
                  msg_callback(state, msgitem)
                  SNet.Group.broadcast(state.netgroup, {state.id, nil, msg}, exclude_pid: [conn_pid])
                  state
                else
                  Logger.warn("Invalid new root after inserting same message item!")
                  state
                end
              else
                # Not a simple one-insertion transition, look at the whole tree
                init_merge(state, new_root, conn_pid)
              end
            else
              Logger.warn("Received message with invalid signature")
              state
            end
          end
        {:root, new_root, ask_reply} ->
          state = if new_root == state.mst.root do
            # already up to date, ignore
            state
          else
            init_merge(state, new_root, conn_pid)
          end
          if ask_reply do
            SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, false}})
          end
          state
        x ->
          Logger.info("Unhandled message: #{inspect x}")
          state
      end
      {:noreply, state}
    end
  end

  defp init_merge(state, new_root, source_peer_pid) do
    old_root = state.mst.root

    if new_root == nil do
      state
    else
      # TODO: make the merge asynchronous
      
      Logger.info("Starting merge for #{inspect state.manifest}, merging root: #{new_root|>Base.encode16}")

      prev_last = for {x, true} <- MST.last(state.mst, nil, 100), into: MapSet.new, do: x

      mgmst = %{state.mst | root: new_root}
      mgmst = put_in(mgmst.store.prefer_ask, [source_peer_pid])
      mst = MST.merge(state.mst, mgmst)

      new = for {x, true} <- MST.last(mst, nil, 100),
              not MapSet.member?(prev_last, x)
            do x end

      correct = for x <- new do
        {pk, bin, sign} = x
        Shard.Keys.verify(pk, bin, sign)
      end

      if Enum.all? correct do
        for x <- new do
          msg_callback(state, x)
        end
        SApp.PageStore.set_roots(state.page_store, [mst.root])
        state = %{state | mst: mst}
        save_state(state)
        if state.mst.root != old_root do
          SNet.Group.broadcast(state.netgroup, {state.id, nil, {:root, state.mst.root, false}}, exclude_pid: [source_peer_pid])
        end
        state
      else
        Logger.warn("Incorrect signatures somewhere while merging, dropping merged data")
        state
      end
    end
  end

  def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
    new_subs = MapSet.delete(state.subs, pid)
    {:noreply, %{ state | subs: new_subs }}
  end

  defp save_state(state) do
    Shard.Manager.save_state(state.id, %{root: state.mst.root, read: state.read})
  end

  defp msg_callback(state, {pk, msgbin, sign}) do
    for pid <- state.subs do
      if Process.alive?(pid) do
        send(pid, {:chat_recv, state.manifest, {pk, msgbin, sign}})
      end
    end
  end

  defp msg_cmp({pk1, msgbin1, _sign1}, {pk2, msgbin2, _sign2}) do
    {ts1, msg1} = SData.term_unbin msgbin1
    {ts2, msg2} = SData.term_unbin msgbin2
    cond do
      ts1 > ts2 -> :after
      ts1 < ts2 -> :before
      pk1 > pk2 -> :after
      pk1 < pk2 -> :before
      msg1 > msg2 -> :after
      msg1 < msg2 -> :before
      true -> :duplicate
    end
  end 

  # ================
  # PUBLIC INTERFACE
  # ================
  
  @doc"""
  Subscribe to notifications for this chat room.

  The process calling this function will start recieving messages of the form:

      {:chat_recv, manifest, {pk, msgbin, sign}}

  or

      {:chat_send, manifest, {pk, msgbin, sign}}

  msgbin can be used in the following way:

      {timestamp, message} = SData.term_unbin msgbin
  """
  def subscribe(shard_pid) do
    GenServer.cast(shard_pid, {:subscribe, self()})
  end

  @doc"""
  Send a message to a chat room.
  """
  def chat_send(shard_pid, pk, msg) do
    GenServer.cast(shard_pid, {:chat_send, pk, msg})
  end

  @doc"""
  Read the history of a chat room.

  The second argument is the last message to read.
  If nil, will read the n last messages.
  If not nill, will read the n last messages until the specified bound.
  """
  def read_history(shard_pid, bound, n) do
    GenServer.call(shard_pid, {:read_history, bound, n})
  end

  @doc"""
  Return a shard's manifest from its pid.
  """
  def get_manifest(shard_pid) do
    GenServer.call(shard_pid, :manifest)
  end

  @doc"""
  Returns timestamp of last message if chat room has unread messages, nil otherwise.
  """
  def has_unread?(shard_pid) do
    GenServer.call(shard_pid, :has_unread)
  end

  @doc"""
  Mark all messages as read
  """
  def mark_read(shard_pid) do
    GenServer.cast(shard_pid, :mark_read)
  end
end