aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/app/chat.ex38
-rw-r--r--lib/cli/cli.ex32
-rw-r--r--lib/data/merklelist.ex2
-rw-r--r--lib/manager.ex16
-rw-r--r--mix.exs8
-rw-r--r--mix.lock11
-rw-r--r--test/conn_test.exs23
-rw-r--r--test/test_helper.exs7
8 files changed, 124 insertions, 13 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
index 42991ce..a9cfb1e 100644
--- a/lib/app/chat.ex
+++ b/lib/app/chat.ex
@@ -39,7 +39,15 @@ defmodule SApp.Chat do
GenServer.cast(Shard.Manager, {:register, id, manifest, self()})
GenServer.cast(self(), :init_pull)
- {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: MapSet.new}}
+ {:ok,
+ %{channel: channel,
+ id: id,
+ manifest: manifest,
+ store: store,
+ peers: MapSet.new,
+ subs: MapSet.new,
+ }
+ }
end
@doc """
@@ -49,6 +57,11 @@ defmodule SApp.Chat do
{:reply, state.manifest, state}
end
+ def handle_call({:read_history, start, num}, _from, state) do
+ ret = ML.read(state.store, start, num)
+ {:reply, ret, state}
+ end
+
@doc """
Implementation of the :redundant handler: if another process is already
synchronizing this channel then we exit.
@@ -80,6 +93,12 @@ defmodule SApp.Chat do
msg}
new_state = %{state | store: ML.insert(state.store, msgitem)}
+ for pid <- state.subs do
+ if Process.alive?(pid) do
+ send(pid, {:chat_send, state.channel, msgitem})
+ end
+ end
+
for peer <- state.peers do
push_messages(new_state, peer, nil, 5)
end
@@ -97,6 +116,11 @@ defmodule SApp.Chat do
{:noreply, %{ state | peers: new_peers }}
end
+ def handle_cast({:subscribe, pid}, state) do
+ new_subs = MapSet.put(state.subs, pid)
+ {:noreply, %{ state | subs: new_subs }}
+ end
+
@doc """
Implementation of the :msg handler, which is the main handler for messages
comming from other peers concerning this chat room.
@@ -110,7 +134,7 @@ defmodule SApp.Chat do
case msg do
{:get_manifest} ->
Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}})
- {:get, start} -> push_messages(peer_id, state, start, 20)
+ {:get, start} -> push_messages(state, peer_id, start, 20)
{:info, _start, list, rest} ->
if rest != nil and not ML.has(state.store, rest) do
Shard.Manager.send(peer_id, {state.id, {:get, rest}})
@@ -131,7 +155,7 @@ defmodule SApp.Chat do
end
def handle_cast({:deferred_insert, list}, state) do
- new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state.channel, msg) end))
+ new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state, msg) end))
{:noreply, %{state | store: new_store}}
end
@@ -143,8 +167,12 @@ defmodule SApp.Chat do
end
end
- defp msg_callback(chan, {ts, nick, msg}) do
- IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}"
+ defp msg_callback(state, {ts, nick, msg}) do
+ for pid <- state.subs do
+ if Process.alive?(pid) do
+ send(pid, {:chat_recv, state.channel, {ts, nick, msg}})
+ end
+ end
end
defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex
index c281d57..8928040 100644
--- a/lib/cli/cli.ex
+++ b/lib/cli/cli.ex
@@ -8,6 +8,8 @@ defmodule SCLI do
end
defp run(pid) do
+ handle_messages()
+
nick = Shard.Identity.get_nickname
prompt = case pid do
nil -> "(no channel) #{nick}: "
@@ -25,11 +27,25 @@ defmodule SCLI do
pid2 = handle_command(pid, command)
run(pid2)
true ->
- GenServer.cast(pid, {:chat_send, str})
+ if str != "" do
+ GenServer.cast(pid, {:chat_send, str})
+ end
run(pid)
end
end
+ defp handle_messages() do
+ receive do
+ {:chat_recv, chan, {ts, nick, msg}} ->
+ IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}"
+ handle_messages()
+ {:chat_send, _, _} ->
+ # do nothing
+ handle_messages()
+ after 10 -> nil
+ end
+ end
+
defp handle_command(pid, ["connect", ipstr, portstr]) do
{:ok, ip} = :inet.parse_address (to_charlist ipstr)
{port, _} = Integer.parse portstr
@@ -47,6 +63,19 @@ defmodule SCLI do
pid
end
+ defp handle_command(pid, ["hist"]) do
+ case GenServer.call(pid, {:read_history, nil, 100}) do
+ {:ok, list, _rest} ->
+ list
+ |> Enum.reverse
+ |> Enum.each(fn {ts, nick, msg} ->
+ IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}"
+ end)
+ _ -> nil
+ end
+ pid
+ end
+
defp handle_command(_pid, ["join", qchan]) do
list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db),
{:chat, chan} = manifest,
@@ -54,6 +83,7 @@ defmodule SCLI do
case List.keyfind(list, qchan, 0) do
nil ->
{:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan})
+ GenServer.cast(pid, {:subscribe, self()})
pid
{_, pid} ->
IO.puts "Switching to ##{qchan}"
diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex
index 71483bd..357f336 100644
--- a/lib/data/merklelist.ex
+++ b/lib/data/merklelist.ex
@@ -78,7 +78,7 @@ defmodule SData.MerkleList do
callback.(item)
new_state
:duplicate -> insert_many_aux(state, rest, callback)
- :before -> push(insert_many_aux(state_rest, [item | rest], callback), item)
+ :before -> push(insert_many_aux(state_rest, [item | rest], callback), front)
end
end
end
diff --git a/lib/manager.ex b/lib/manager.ex
index 45aae5f..87f95c5 100644
--- a/lib/manager.ex
+++ b/lib/manager.ex
@@ -28,6 +28,8 @@ defmodule Shard.Manager do
use GenServer
+ require Logger
+
def start_link(my_port) do
GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
end
@@ -96,7 +98,9 @@ defmodule Shard.Manager do
add_peer(ip, port, state)
currtime = System.os_time :second
:ets.insert(state.outbox, {peer_id, msg, currtime})
- outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end)
+ outbox_cleanup = [{{:_, :_, :'$1'},
+ [{:<, :'$1', currtime - 60}],
+ [:'$1']}]
:ets.select_delete(state.outbox, outbox_cleanup)
_ -> nil
end
@@ -121,9 +125,13 @@ defmodule Shard.Manager do
defp add_peer(ip, port, state) do
spawn fn ->
- {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
- :ok = :gen_tcp.controlling_process(client, pid)
+ case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
+ {:ok, client} ->
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
+ :ok = :gen_tcp.controlling_process(client, pid)
+ _ ->
+ Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
+ end
end
end
diff --git a/mix.exs b/mix.exs
index 83a5730..71dac77 100644
--- a/mix.exs
+++ b/mix.exs
@@ -8,7 +8,9 @@ defmodule Shard.MixProject do
elixir: "~> 1.6",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env() == :prod,
- deps: deps()
+ deps: deps(),
+ test_coverage: [tool: ExCoveralls],
+ preferred_cli_env: [coveralls: :test, "coveralls.detail": :test, "coveralls.post": :test, "coveralls.html": :test]
]
end
@@ -27,7 +29,9 @@ defmodule Shard.MixProject do
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
# {:cowboy, "~> 1.1.2"},
# {:plug, "~> 1.3.4"},
- {:salty, "~> 0.1.3", hex: :libsalty}
+ {:excoveralls, "~> 0.10", only: :test},
+
+ {:salty, "~> 0.1.3", hex: :libsalty},
]
end
end
diff --git a/mix.lock b/mix.lock
index 110ecd7..4f92013 100644
--- a/mix.lock
+++ b/mix.lock
@@ -1,9 +1,20 @@
%{
+ "certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"},
"cowboy": {:hex, :cowboy, "1.1.2", "61ac29ea970389a88eca5a65601460162d370a70018afe6f949a29dca91f3bb0", [:rebar3], [{:cowlib, "~> 1.0.2", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3.2", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm"},
"cowlib": {:hex, :cowlib, "1.0.2", "9d769a1d062c9c3ac753096f868ca121e2730b9a377de23dec0f7e08b1df84ee", [:make], [], "hexpm"},
"elixir_make": {:hex, :elixir_make, "0.4.2", "332c649d08c18bc1ecc73b1befc68c647136de4f340b548844efc796405743bf", [:mix], [], "hexpm"},
+ "ex2ms": {:hex, :ex2ms, "1.5.0", "19e27f9212be9a96093fed8cdfbef0a2b56c21237196d26760f11dfcfae58e97", [:mix], [], "hexpm"},
+ "excoveralls": {:hex, :excoveralls, "0.10.0", "a4508bdd408829f38e7b2519f234b7fd5c83846099cda348efcb5291b081200c", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
+ "hackney": {:hex, :hackney, "1.13.0", "24edc8cd2b28e1c652593833862435c80661834f6c9344e84b6a2255e7aeef03", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
+ "idna": {:hex, :idna, "5.1.2", "e21cb58a09f0228a9e0b95eaa1217f1bcfc31a1aaa6e1fdf2f53a33f7dbd9494", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
+ "jason": {:hex, :jason, "1.1.1", "d3ccb840dfb06f2f90a6d335b536dd074db748b3e7f5b11ab61d239506585eb2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
+ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
"mime": {:hex, :mime, "1.3.0", "5e8d45a39e95c650900d03f897fbf99ae04f60ab1daa4a34c7a20a5151b7a5fe", [:mix], [], "hexpm"},
+ "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"},
+ "parse_trans": {:hex, :parse_trans, "3.2.0", "2adfa4daf80c14dc36f522cf190eb5c4ee3e28008fc6394397c16f62a26258c2", [:rebar3], [], "hexpm"},
"plug": {:hex, :plug, "1.3.6", "bcdf94ac0f4bc3b804bdbdbde37ebf598bd7ed2bfa5106ed1ab5984a09b7e75f", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1", [hex: :cowboy, repo: "hexpm", optional: true]}, {:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}], "hexpm"},
"ranch": {:hex, :ranch, "1.3.2", "e4965a144dc9fbe70e5c077c65e73c57165416a901bd02ea899cfd95aa890986", [:rebar3], [], "hexpm"},
"salty": {:hex, :libsalty, "0.1.3", "13332eb13ac995f5deb76903b44f96f740e1e3a6e511222bffdd8b42cd079ffb", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm"},
+ "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"},
+ "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"},
}
diff --git a/test/conn_test.exs b/test/conn_test.exs
index ff746b7..d2431d7 100644
--- a/test/conn_test.exs
+++ b/test/conn_test.exs
@@ -39,4 +39,27 @@ defmodule ShardTest.Conn do
{:ok, msg} = Box.open_easy(enc, n, pk, sk)
msg
end
+
+
+ test "set nickname" do
+ Shard.Identity.set_nickname "test bot"
+ end
+
+ test "connect to other instance" do
+ Shard.Manager.add_peer({127, 0, 0, 1}, 4045)
+ receive do after 100 -> nil end
+ end
+
+ test "connect to chat rooms" do
+ {:ok, pid1} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"})
+ {:ok, pid2} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "other_test"})
+ GenServer.cast(pid1, {:chat_send, "test msg 1"})
+ GenServer.cast(pid2, {:chat_send, "test msg 2"})
+
+ receive do after 100 -> nil end
+ {:ok, pid3} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"})
+ receive do after 100 -> nil end
+ assert not Process.alive?(pid3)
+ end
+
end
diff --git a/test/test_helper.exs b/test/test_helper.exs
index 869559e..e5b6600 100644
--- a/test/test_helper.exs
+++ b/test/test_helper.exs
@@ -1 +1,8 @@
ExUnit.start()
+
+case :gen_tcp.connect('localhost', 4045, []) do
+ {:ok, socket} ->
+ :gen_tcp.close(socket)
+ {:error, _reason} ->
+ Mix.raise "Please have another instance of Shard running at 127.0.0.1:4045, it can be launched with the command: PORT=4045 iex -S mix"
+end