diff options
-rw-r--r-- | lib/app/chat.ex | 38 | ||||
-rw-r--r-- | lib/cli/cli.ex | 32 | ||||
-rw-r--r-- | lib/data/merklelist.ex | 2 | ||||
-rw-r--r-- | lib/manager.ex | 16 | ||||
-rw-r--r-- | mix.exs | 8 | ||||
-rw-r--r-- | mix.lock | 11 | ||||
-rw-r--r-- | test/conn_test.exs | 23 | ||||
-rw-r--r-- | test/test_helper.exs | 7 |
8 files changed, 124 insertions, 13 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex index 42991ce..a9cfb1e 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -39,7 +39,15 @@ defmodule SApp.Chat do GenServer.cast(Shard.Manager, {:register, id, manifest, self()}) GenServer.cast(self(), :init_pull) - {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: MapSet.new}} + {:ok, + %{channel: channel, + id: id, + manifest: manifest, + store: store, + peers: MapSet.new, + subs: MapSet.new, + } + } end @doc """ @@ -49,6 +57,11 @@ defmodule SApp.Chat do {:reply, state.manifest, state} end + def handle_call({:read_history, start, num}, _from, state) do + ret = ML.read(state.store, start, num) + {:reply, ret, state} + end + @doc """ Implementation of the :redundant handler: if another process is already synchronizing this channel then we exit. @@ -80,6 +93,12 @@ defmodule SApp.Chat do msg} new_state = %{state | store: ML.insert(state.store, msgitem)} + for pid <- state.subs do + if Process.alive?(pid) do + send(pid, {:chat_send, state.channel, msgitem}) + end + end + for peer <- state.peers do push_messages(new_state, peer, nil, 5) end @@ -97,6 +116,11 @@ defmodule SApp.Chat do {:noreply, %{ state | peers: new_peers }} end + def handle_cast({:subscribe, pid}, state) do + new_subs = MapSet.put(state.subs, pid) + {:noreply, %{ state | subs: new_subs }} + end + @doc """ Implementation of the :msg handler, which is the main handler for messages comming from other peers concerning this chat room. @@ -110,7 +134,7 @@ defmodule SApp.Chat do case msg do {:get_manifest} -> Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}}) - {:get, start} -> push_messages(peer_id, state, start, 20) + {:get, start} -> push_messages(state, peer_id, start, 20) {:info, _start, list, rest} -> if rest != nil and not ML.has(state.store, rest) do Shard.Manager.send(peer_id, {state.id, {:get, rest}}) @@ -131,7 +155,7 @@ defmodule SApp.Chat do end def handle_cast({:deferred_insert, list}, state) do - new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state.channel, msg) end)) + new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state, msg) end)) {:noreply, %{state | store: new_store}} end @@ -143,8 +167,12 @@ defmodule SApp.Chat do end end - defp msg_callback(chan, {ts, nick, msg}) do - IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}" + defp msg_callback(state, {ts, nick, msg}) do + for pid <- state.subs do + if Process.alive?(pid) do + send(pid, {:chat_recv, state.channel, {ts, nick, msg}}) + end + end end defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex index c281d57..8928040 100644 --- a/lib/cli/cli.ex +++ b/lib/cli/cli.ex @@ -8,6 +8,8 @@ defmodule SCLI do end defp run(pid) do + handle_messages() + nick = Shard.Identity.get_nickname prompt = case pid do nil -> "(no channel) #{nick}: " @@ -25,11 +27,25 @@ defmodule SCLI do pid2 = handle_command(pid, command) run(pid2) true -> - GenServer.cast(pid, {:chat_send, str}) + if str != "" do + GenServer.cast(pid, {:chat_send, str}) + end run(pid) end end + defp handle_messages() do + receive do + {:chat_recv, chan, {ts, nick, msg}} -> + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}" + handle_messages() + {:chat_send, _, _} -> + # do nothing + handle_messages() + after 10 -> nil + end + end + defp handle_command(pid, ["connect", ipstr, portstr]) do {:ok, ip} = :inet.parse_address (to_charlist ipstr) {port, _} = Integer.parse portstr @@ -47,6 +63,19 @@ defmodule SCLI do pid end + defp handle_command(pid, ["hist"]) do + case GenServer.call(pid, {:read_history, nil, 100}) do + {:ok, list, _rest} -> + list + |> Enum.reverse + |> Enum.each(fn {ts, nick, msg} -> + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" + end) + _ -> nil + end + pid + end + defp handle_command(_pid, ["join", qchan]) do list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db), {:chat, chan} = manifest, @@ -54,6 +83,7 @@ defmodule SCLI do case List.keyfind(list, qchan, 0) do nil -> {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan}) + GenServer.cast(pid, {:subscribe, self()}) pid {_, pid} -> IO.puts "Switching to ##{qchan}" diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex index 71483bd..357f336 100644 --- a/lib/data/merklelist.ex +++ b/lib/data/merklelist.ex @@ -78,7 +78,7 @@ defmodule SData.MerkleList do callback.(item) new_state :duplicate -> insert_many_aux(state, rest, callback) - :before -> push(insert_many_aux(state_rest, [item | rest], callback), item) + :before -> push(insert_many_aux(state_rest, [item | rest], callback), front) end end end diff --git a/lib/manager.ex b/lib/manager.ex index 45aae5f..87f95c5 100644 --- a/lib/manager.ex +++ b/lib/manager.ex @@ -28,6 +28,8 @@ defmodule Shard.Manager do use GenServer + require Logger + def start_link(my_port) do GenServer.start_link(__MODULE__, my_port, name: __MODULE__) end @@ -96,7 +98,9 @@ defmodule Shard.Manager do add_peer(ip, port, state) currtime = System.os_time :second :ets.insert(state.outbox, {peer_id, msg, currtime}) - outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end) + outbox_cleanup = [{{:_, :_, :'$1'}, + [{:<, :'$1', currtime - 60}], + [:'$1']}] :ets.select_delete(state.outbox, outbox_cleanup) _ -> nil end @@ -121,9 +125,13 @@ defmodule Shard.Manager do defp add_peer(ip, port, state) do spawn fn -> - {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) - :ok = :gen_tcp.controlling_process(client, pid) + case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do + {:ok, client} -> + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) + :ok = :gen_tcp.controlling_process(client, pid) + _ -> + Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" + end end end @@ -8,7 +8,9 @@ defmodule Shard.MixProject do elixir: "~> 1.6", build_embedded: Mix.env == :prod, start_permanent: Mix.env() == :prod, - deps: deps() + deps: deps(), + test_coverage: [tool: ExCoveralls], + preferred_cli_env: [coveralls: :test, "coveralls.detail": :test, "coveralls.post": :test, "coveralls.html": :test] ] end @@ -27,7 +29,9 @@ defmodule Shard.MixProject do # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}, # {:cowboy, "~> 1.1.2"}, # {:plug, "~> 1.3.4"}, - {:salty, "~> 0.1.3", hex: :libsalty} + {:excoveralls, "~> 0.10", only: :test}, + + {:salty, "~> 0.1.3", hex: :libsalty}, ] end end @@ -1,9 +1,20 @@ %{ + "certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"}, "cowboy": {:hex, :cowboy, "1.1.2", "61ac29ea970389a88eca5a65601460162d370a70018afe6f949a29dca91f3bb0", [:rebar3], [{:cowlib, "~> 1.0.2", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3.2", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"}, "cowlib": {:hex, :cowlib, "1.0.2", "9d769a1d062c9c3ac753096f868ca121e2730b9a377de23dec0f7e08b1df84ee", [:make], [], "hexpm"}, "elixir_make": {:hex, :elixir_make, "0.4.2", "332c649d08c18bc1ecc73b1befc68c647136de4f340b548844efc796405743bf", [:mix], [], "hexpm"}, + "ex2ms": {:hex, :ex2ms, "1.5.0", "19e27f9212be9a96093fed8cdfbef0a2b56c21237196d26760f11dfcfae58e97", [:mix], [], "hexpm"}, + "excoveralls": {:hex, :excoveralls, "0.10.0", "a4508bdd408829f38e7b2519f234b7fd5c83846099cda348efcb5291b081200c", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, + "hackney": {:hex, :hackney, "1.13.0", "24edc8cd2b28e1c652593833862435c80661834f6c9344e84b6a2255e7aeef03", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, + "idna": {:hex, :idna, "5.1.2", "e21cb58a09f0228a9e0b95eaa1217f1bcfc31a1aaa6e1fdf2f53a33f7dbd9494", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, + "jason": {:hex, :jason, "1.1.1", "d3ccb840dfb06f2f90a6d335b536dd074db748b3e7f5b11ab61d239506585eb2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, + "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, "mime": {:hex, :mime, "1.3.0", "5e8d45a39e95c650900d03f897fbf99ae04f60ab1daa4a34c7a20a5151b7a5fe", [:mix], [], "hexpm"}, + "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"}, + "parse_trans": {:hex, :parse_trans, "3.2.0", "2adfa4daf80c14dc36f522cf190eb5c4ee3e28008fc6394397c16f62a26258c2", [:rebar3], [], "hexpm"}, "plug": {:hex, :plug, "1.3.6", "bcdf94ac0f4bc3b804bdbdbde37ebf598bd7ed2bfa5106ed1ab5984a09b7e75f", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1", [hex: :cowboy, repo: "hexpm", optional: true]}, {:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}], "hexpm"}, "ranch": {:hex, :ranch, "1.3.2", "e4965a144dc9fbe70e5c077c65e73c57165416a901bd02ea899cfd95aa890986", [:rebar3], [], "hexpm"}, "salty": {:hex, :libsalty, "0.1.3", "13332eb13ac995f5deb76903b44f96f740e1e3a6e511222bffdd8b42cd079ffb", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"}, } diff --git a/test/conn_test.exs b/test/conn_test.exs index ff746b7..d2431d7 100644 --- a/test/conn_test.exs +++ b/test/conn_test.exs @@ -39,4 +39,27 @@ defmodule ShardTest.Conn do {:ok, msg} = Box.open_easy(enc, n, pk, sk) msg end + + + test "set nickname" do + Shard.Identity.set_nickname "test bot" + end + + test "connect to other instance" do + Shard.Manager.add_peer({127, 0, 0, 1}, 4045) + receive do after 100 -> nil end + end + + test "connect to chat rooms" do + {:ok, pid1} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"}) + {:ok, pid2} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "other_test"}) + GenServer.cast(pid1, {:chat_send, "test msg 1"}) + GenServer.cast(pid2, {:chat_send, "test msg 2"}) + + receive do after 100 -> nil end + {:ok, pid3} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"}) + receive do after 100 -> nil end + assert not Process.alive?(pid3) + end + end diff --git a/test/test_helper.exs b/test/test_helper.exs index 869559e..e5b6600 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1 +1,8 @@ ExUnit.start() + +case :gen_tcp.connect('localhost', 4045, []) do + {:ok, socket} -> + :gen_tcp.close(socket) + {:error, _reason} -> + Mix.raise "Please have another instance of Shard running at 127.0.0.1:4045, it can be launched with the command: PORT=4045 iex -S mix" +end |