From b234a360dafa0a65d797614aad5a4514570784f8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 4 Jul 2018 13:17:26 +0200 Subject: Multiple chat rooms (no web UI yet) --- lib/app/chat.ex | 83 +++++++++++++++++++++++++++++++++++++++++++++----- lib/application.ex | 4 +-- lib/cli/cli.ex | 56 +++++++++++++++++++++++++++++----- lib/data/merklelist.ex | 4 +-- lib/manager.ex | 46 ++++++++++++++++++++++++++++ lib/net/manager.ex | 9 ++++-- lib/net/tcpconn.ex | 34 ++++++--------------- lib/net/tcpserver.ex | 2 +- 8 files changed, 191 insertions(+), 47 deletions(-) create mode 100644 lib/manager.ex diff --git a/lib/app/chat.ex b/lib/app/chat.ex index 4a56085..f165514 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -1,17 +1,86 @@ defmodule SApp.Chat do - def send(msg) do + use GenServer + + def start_link(channel) do + GenServer.start_link(__MODULE__, channel) + end + + def init(channel) do + {:ok, store} = SData.MerkleList.start_link(&msg_cmp/2) + manifest = {:chat, channel} + id = :crypto.hash(:sha256, :erlang.term_to_binary(manifest)) + + GenServer.cast(Shard.Manager, {:register, id, self()}) + GenServer.cast(self(), :init_pull) + + {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: %{}}} + end + + def handle_call(:manifest, _from, state) do + {:reply, state.manifest, state} + end + + def handle_cast({:redundant, _}, _state) do + exit :normal + end + + def handle_cast(:init_pull, state) do + GenServer.call(SNet.Manager, :get_all) + |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}}))) + {:noreply, state} + end + + def handle_cast({:chat_send, msg}, state) do msgitem = {(System.os_time :seconds), Shard.Identity.get_nickname(), msg} - GenServer.cast(SApp.Chat.Log, {:insert, msgitem}) + GenServer.cast(state.store, {:insert, msgitem}) + + for {_, pid} <- state.peers do + push_messages(state, pid, nil, 5) + end - SNet.ConnSupervisor - |> DynamicSupervisor.which_children - |> Enum.each(fn {_, pid, _, _} -> GenServer.cast(pid, :init_push) end) + {:noreply, state} end - def msg_callback({ts, nick, msg}) do - IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" + def handle_cast({:interested, peer_id, peer_pid}, state) do + push_messages(state, peer_pid, nil, 10) + new_peers = Map.put(state.peers, peer_id, peer_pid) + {:noreply, %{ state | peers: new_peers }} + end + + def handle_cast({:msg, peer_id, peer_pid, msg}, state) do + case msg do + :get_top -> push_messages(peer_id, state, nil, 10) + {:get, start} -> push_messages(peer_id, state, start, 20) + {:info, _start, list, rest} -> + if rest != nil and not GenServer.call(state.store, {:has, rest}) do + GenServer.cast(peer_pid, {:send_msg, {state.id, {:get, rest}}}) + end + spawn_link(fn -> + Process.sleep 1000 + GenServer.cast(state.store, {:insert_many, list, (fn msg -> msg_callback(state.channel, msg) end)}) + end) + end + + if Map.has_key?(state.peers, peer_id) do + {:noreply, state} + else + handle_cast({:interested, peer_id, peer_pid}, state) + end + end + + defp push_messages(state, to, start, num) do + case GenServer.call(state.store, {:read, start, num}) do + {:ok, list, rest} -> + GenServer.cast(to, {:send_msg, {state.id, {:info, start, list, rest}}}) + _ -> nil + end + end + + + def msg_callback(chan, {ts, nick, msg}) do + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}" end def msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do diff --git a/lib/application.ex b/lib/application.ex index 3ad9325..e375cf4 100644 --- a/lib/application.ex +++ b/lib/application.ex @@ -13,14 +13,14 @@ defmodule Shard.Application do # Define workers and child supervisors to be supervised children = [ Shard.Identity, + { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor }, # Networking { SNet.Manager, listen_port }, { SNet.TCPServer, listen_port }, - { DynamicSupervisor, strategy: :one_for_one, name: SNet.ConnSupervisor }, # Applications & data store - { SData.MerkleList, [&SApp.Chat.msg_cmp/2, name: SApp.Chat.Log] }, + Shard.Manager, # Web UI Plug.Adapters.Cowboy.child_spec(:http, SWeb.HTTPRouter, [], port: listen_port + 1000) diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex index 28ef5d0..0402b3f 100644 --- a/lib/cli/cli.ex +++ b/lib/cli/cli.ex @@ -1,30 +1,70 @@ defmodule SCLI do def run() do - str = "say: " |> IO.gets |> String.trim + run(nil) + end + + defp run(pid) do + nick = Shard.Identity.get_nickname + prompt = case pid do + nil -> "(no channel) #{nick}: " + _ -> + {:chat, chan} = GenServer.call(pid, :manifest) + "##{chan} #{nick}: " + end + + str = prompt |> IO.gets |> String.trim cond do str == "/quit" -> nil String.slice(str, 0..0) == "/" -> command = str |> String.slice(1..-1) |> String.split(" ") - handle_command(command) - run() + pid2 = handle_command(pid, command) + run(pid2) true -> - SApp.Chat.send(str) - run() + GenServer.cast(pid, {:chat_send, str}) + run(pid) end end - def handle_command(["connect", ipstr, portstr]) do + def handle_command(pid, ["connect", ipstr, portstr]) do {:ok, ip} = :inet.parse_address (to_charlist ipstr) {port, _} = Integer.parse portstr SNet.Manager.add_peer(ip, port) + pid + end + + def handle_command(pid, ["list"]) do + IO.puts "List of known channels:" + list = GenServer.call(Shard.Manager, :list) + for {_chid, chpid} <- list do + {:chat, chan} = GenServer.call(chpid, :manifest) + IO.puts "##{chan}" + end + pid + end + + def handle_command(pid, ["join", qchan]) do + list = GenServer.call(Shard.Manager, :list) + list = for {_chid, chpid} <- list, + {:chat, chan} = GenServer.call(chpid, :manifest), + do: {chan, chpid} + case List.keyfind(list, qchan, 0) do + nil -> + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan}) + pid + {_, pid} -> + IO.puts "Switching to ##{qchan}" + pid + end end - def handle_command(["nick", nick]) do + def handle_command(pid, ["nick", nick]) do Shard.Identity.set_nickname nick + pid end - def handle_command(_cmd) do + def handle_command(pid, _cmd) do IO.puts "Invalid command" + pid end end diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex index c9e27f6..42b80fc 100644 --- a/lib/data/merklelist.ex +++ b/lib/data/merklelist.ex @@ -1,8 +1,8 @@ defmodule SData.MerkleList do use GenServer - def start_link([cmp, name: name]) do - GenServer.start_link(__MODULE__, cmp, [name: name]) + def start_link(cmp) do + GenServer.start_link(__MODULE__, cmp) end defp term_hash(term) do diff --git a/lib/manager.ex b/lib/manager.ex new file mode 100644 index 0000000..ce11117 --- /dev/null +++ b/lib/manager.ex @@ -0,0 +1,46 @@ +defmodule Shard.Manager do + use GenServer + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + def init(_) do + state = %{ + shards: %{} + } + {:ok, state} + end + + def handle_call({:find, shard_id}, _from, state) do + {:reply, state.shards[shard_id], state} + end + + def handle_call(:list, _from, state) do + {:reply, Map.to_list(state.shards), state} + end + + def handle_cast({:register, shard_id, pid}, state) do + if Map.has_key?(state.shards, shard_id) do + GenServer.cast(pid, {:redundant, shard_id}) + state + else + new_shards = Map.put(state.shards, shard_id, pid) + {:noreply, %{ state | shards: new_shards }} + end + end + + def handle_cast({:interested, peer_id, peer_pid, shards}, state) do + shards + |> Enum.filter(&(Map.has_key?(state.shards, &1))) + |> Enum.each(&(GenServer.cast(state.shards[&1], {:interested, peer_id, peer_pid}))) + {:noreply, state} + end + + def handle_cast({:dispatch, peer_id, peer_pid, shard, msg}, state) do + if Map.has_key?(state.shards, shard) do + GenServer.cast(state.shards[shard], {:msg, peer_id, peer_pid, msg}) + end + {:noreply, state} + end +end diff --git a/lib/net/manager.ex b/lib/net/manager.ex index e5eb12d..4b1ce94 100644 --- a/lib/net/manager.ex +++ b/lib/net/manager.ex @@ -40,7 +40,12 @@ defmodule SNet.Manager do {:noreply, state} end - def handle_call({:get_connections, pk_list}, state) do + def handle_call(:get_all, _from, state) do + pid_list = (for {_, {pid, _, _}} <- state.peers, pid != nil, do: pid) + {:reply, pid_list, state} + end + + def handle_call({:get_connections, pk_list}, _from, state) do pid_list = (for pk <- pk_list, Map.has_key?(state.peers, pk), do: state.peers[pk]) |> Enum.map(fn {pid, _, _} -> pid end) |> Enum.filter(&(&1 != nil)) @@ -53,7 +58,7 @@ defmodule SNet.Manager do def add_peer(ip, port, my_port) do {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) - {:ok, pid} = DynamicSupervisor.start_child(SNet.ConnSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}}) + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}}) :ok = :gen_tcp.controlling_process(client, pid) pid end diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex index 301e931..64c85e9 100644 --- a/lib/net/tcpconn.ex +++ b/lib/net/tcpconn.ex @@ -53,10 +53,9 @@ defmodule SNet.TCPConn do addr: addr, port: port } - GenServer.cast(SNet.Manager, {:peer_up, self(), cli_pkey, addr, his_port}) + GenServer.cast(SNet.Manager, {:peer_up, cli_pkey, self(), addr, his_port}) Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" - - GenServer.cast(self(), :init_push) + GenServer.cast(self(), :init_pull) {:noreply, state} end @@ -66,8 +65,9 @@ defmodule SNet.TCPConn do {:noreply, state} end - def handle_cast(:init_push, state) do - push_messages(state, nil, 10) + def handle_cast(:init_pull, state) do + id_list = (for {id, _} <- GenServer.call(Shard.Manager, :list), do: id) + send_msg(state, {:interested, id_list}) {:noreply, state} end @@ -102,28 +102,12 @@ defmodule SNet.TCPConn do exit(:normal) end - defp push_messages(state, start, num) do - case GenServer.call(SApp.Chat.Log, {:read, start, num}) do - {:ok, list, rest} -> - send_msg(state, {:info, start, list, rest}) - _ -> nil - end + defp handle_packet({:interested, shards}, state) do + GenServer.cast(Shard.Manager, {:interested, state.his_pkey, self(), shards}) end - defp handle_packet(msg, state) do - # Logger.info "Message: #{inspect msg}" - case msg do - :get_top -> push_messages(state, nil, 10) - {:get, start} -> push_messages(state, start, 20) - {:info, _start, list, rest} -> - if rest != nil and not GenServer.call(SApp.Chat.Log, {:has, rest}) do - send_msg(state, {:get, rest}) - end - spawn_link(fn -> - Process.sleep 1000 - GenServer.cast(SApp.Chat.Log, {:insert_many, list, &SApp.Chat.msg_callback/1}) - end) - end + defp handle_packet({shard, msg}, state) do + GenServer.cast(Shard.Manager, {:dispatch, state.his_pkey, self(), shard, msg}) end defp print_id(state) do diff --git a/lib/net/tcpserver.ex b/lib/net/tcpserver.ex index 7c758c1..46552a4 100644 --- a/lib/net/tcpserver.ex +++ b/lib/net/tcpserver.ex @@ -18,7 +18,7 @@ defmodule SNet.TCPServer do defp loop_acceptor(socket, my_port) do {:ok, client} = :gen_tcp.accept(socket) - {:ok, pid} = DynamicSupervisor.start_child(SNet.ConnSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}}) + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}}) :ok = :gen_tcp.controlling_process(client, pid) loop_acceptor(socket, my_port) end -- cgit v1.2.3