aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/net/group.ex
blob: a5f0867d21c9852ae06d9f17cd1b08c1c2475a68 (plain) (tree)






















                                                                                          
                                       










                                                          
                                                              

                                                                                      
                                                              
                                                           

                                                            
                                                               



                                                                 






                                                       



                                                         

       



                                            

                                         
                                              





                                                          
                                                                    

                                  
                                                  












                                                                    


                                                                    
                                                                                                     










                                                                          




                                                                   



               

       
                                                             
                                                                                                 

                                                   

       



                                            

                                               
                                              




















                                                                                                 

       





                                                                        


       
defprotocol SNet.Group do
  @moduledoc"""
  A group is a specification of a bunch of peers we want and accept to talk to
  about some things. It supports a number of abstract operations for finding peers,
  broadcasting/gossiping, authenticating, etc.
  """

  @doc"""
  Find new peers for this group, open connections and notify us when connections are open.

  Launches background processes if necessary, returns immediately.
  """
  def init_lookup(group, notify_to) 

  @doc"""
  Get all currently open connections to peers in this group.
  """
  def get_connections(group)

  @doc"""
  Broadcast a message to peers of the group.
  Will send to at most nmax peers, so this is a good primitive for gossip.
  """
  def broadcast(group, msg, opts \\ [])

  @doc"""
  Check if a peer is allowed to participate in this group.
  """
  def in_group?(group, conn_pid, auth)
end

defmodule SNet.PubShardGroup do
  defstruct [:id]

  defimpl SNet.Group do
    def init_lookup(%SNet.PubShardGroup{id: id}, notify_to) do
      # For now: ask all currently connected peers and connect to new peers we know of
      spawn fn ->
        for {_, pid, _, _} <- SNet.Manager.list_connections do
          GenServer.cast(notify_to, {:peer_connected, pid})
        end
        for peer_info <- Shard.Manager.get_shard_peers id do
          if SNet.Manager.get_connections_to peer_info == [] do
            SNet.Manager.add_peer(peer_info,
              callback: fn pid ->
                GenServer.cast(notify_to, {:peer_connected, pid})
              end)
          end
        end
      end
      # TODO: use a DHT to find peers
    end

    def get_connections(%SNet.PubShardGroup{id: id}) do
      Shard.Manager.get_shard_peers(id)
      |> Enum.map(&(SNet.Manager.get_connections_to(&1)))
      |> Enum.filter(&(&1 != []))
      |> Enum.map(fn [{pid, _auth}|_] -> pid end)
    end

    def broadcast(group, msg, opts) do
      nmax = opts[:nmax] || 10
      exclude_pid = opts[:exclude_pid] || []

      %SNet.PubShardGroup{id: id} = group
      nsent = get_connections(group)
      |> Enum.filter(&(&1 not in exclude_pid))
      |> Enum.shuffle
      |> Enum.take(nmax)
      |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg})))
      |> Enum.count
      if nmax - nsent > 0 do
        Shard.Manager.get_shard_peers(id)
        |> Enum.filter(&(SNet.Manager.get_connections_to(&1) == []))
        |> Enum.shuffle
        |> Enum.take(nmax - nsent)
        |> Enum.map(&(SNet.Manager.send(&1, msg)))
      end
    end

    def in_group?(%SNet.PubShardGroup{id: _id}, _peer_pid, _auth) do
      true     # No access control
    end
  end
end

defmodule SNet.PrivGroup do
  defstruct [:pk_list]

  defimpl SNet.Group do
    def init_lookup(%SNet.PrivGroup{pk_list: pk_list}, notify_to) do
      spawn fn ->
        # 1. We might already have some connections to these guys
        for {_, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, _} <- SNet.Manager.list_connections do
          if (my_pk in pk_list) and (his_pk in pk_list) do
            GenServer.cast(notify_to, {:peer_connected, pid})
          end
        end
        # 2. We might also want to open some new connections to these guys
        [my_pk|_] = Enum.filter(pk_list, &Shard.Keys.have_sk?/1)
        for pk <- pk_list do
          pid = SApp.Identity.find_proc(pk)
          info = GenServer.call(pid, :get_info)
          if Map.has_key?(info, :peer_info) do
            for pi <- info.peer_info do
              SNet.Manager.add_peer(pi,
                auth: %SNet.Auth{my_pk: my_pk, his_pk: pk},
                callback: fn pid ->
                  GenServer.cast(notify_to, {:peer_connected, pid})
                end)
            end
          end
        end
      end
    end

    def get_connections(%SNet.PrivGroup{pk_list: pk_list}) do
      for {_, pid, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, _} <- SNet.Manager.list_connections,
        (my_pk in pk_list) and (his_pk in pk_list),
      do: pid
    end

    def broadcast(group, msg, opts) do
      nmax = opts[:nmax] || 10
      exclude_pid = opts[:exclude_pid] || []

      %SNet.PrivGroup{pk_list: pk_list} = group
      nsent = get_connections(group)
      |> Enum.filter(&(&1 not in exclude_pid))
      |> Enum.shuffle
      |> Enum.take(nmax)
      |> Enum.map(&(GenServer.cast(&1, {:send_msg, msg})))
      |> Enum.count
      if nmax - nsent > 0 do
        my_pks = Enum.filter(pk_list, &Shard.Keys.have_sk?/1)
        [my_pk|_] = my_pks
        candidates = for pk <- pk_list,
          pid = SApp.Identity.find_proc(pk),
          info = GenServer.call(pid, :get_info),
          Map.has_key?(info, :peer_info),
          xx <- info.peer_info,
          do: {xx, pk}
        candidates
        |> Enum.filter(fn {peer_info, his_pk} ->
            SNet.Manager.get_auth_connections_to(peer_info, my_pks, his_pk) == [] end)
        |> Enum.shuffle()
        |> Enum.take(nmax - nsent)
        |> Enum.map(fn {peer_info, his_pk} ->
            SNet.Manager.send_auth(peer_info, %SNet.Auth{my_pk: my_pk, his_pk: his_pk}, msg) end)
      end
    end

    def in_group?(%SNet.PrivGroup{pk_list: pk_list}, _peer_pid, auth) do
      case auth do
        nil -> false
        %SNet.Auth{my_pk: my_pk, his_pk: his_pk} ->
          (my_pk in pk_list) and (his_pk in pk_list)
      end
    end
  end
end