aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/app/chat.ex38
-rw-r--r--lib/cli/cli.ex32
-rw-r--r--lib/data/merklelist.ex2
-rw-r--r--lib/manager.ex16
4 files changed, 77 insertions, 11 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
index 42991ce..a9cfb1e 100644
--- a/lib/app/chat.ex
+++ b/lib/app/chat.ex
@@ -39,7 +39,15 @@ defmodule SApp.Chat do
GenServer.cast(Shard.Manager, {:register, id, manifest, self()})
GenServer.cast(self(), :init_pull)
- {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: MapSet.new}}
+ {:ok,
+ %{channel: channel,
+ id: id,
+ manifest: manifest,
+ store: store,
+ peers: MapSet.new,
+ subs: MapSet.new,
+ }
+ }
end
@doc """
@@ -49,6 +57,11 @@ defmodule SApp.Chat do
{:reply, state.manifest, state}
end
+ def handle_call({:read_history, start, num}, _from, state) do
+ ret = ML.read(state.store, start, num)
+ {:reply, ret, state}
+ end
+
@doc """
Implementation of the :redundant handler: if another process is already
synchronizing this channel then we exit.
@@ -80,6 +93,12 @@ defmodule SApp.Chat do
msg}
new_state = %{state | store: ML.insert(state.store, msgitem)}
+ for pid <- state.subs do
+ if Process.alive?(pid) do
+ send(pid, {:chat_send, state.channel, msgitem})
+ end
+ end
+
for peer <- state.peers do
push_messages(new_state, peer, nil, 5)
end
@@ -97,6 +116,11 @@ defmodule SApp.Chat do
{:noreply, %{ state | peers: new_peers }}
end
+ def handle_cast({:subscribe, pid}, state) do
+ new_subs = MapSet.put(state.subs, pid)
+ {:noreply, %{ state | subs: new_subs }}
+ end
+
@doc """
Implementation of the :msg handler, which is the main handler for messages
comming from other peers concerning this chat room.
@@ -110,7 +134,7 @@ defmodule SApp.Chat do
case msg do
{:get_manifest} ->
Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}})
- {:get, start} -> push_messages(peer_id, state, start, 20)
+ {:get, start} -> push_messages(state, peer_id, start, 20)
{:info, _start, list, rest} ->
if rest != nil and not ML.has(state.store, rest) do
Shard.Manager.send(peer_id, {state.id, {:get, rest}})
@@ -131,7 +155,7 @@ defmodule SApp.Chat do
end
def handle_cast({:deferred_insert, list}, state) do
- new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state.channel, msg) end))
+ new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state, msg) end))
{:noreply, %{state | store: new_store}}
end
@@ -143,8 +167,12 @@ defmodule SApp.Chat do
end
end
- defp msg_callback(chan, {ts, nick, msg}) do
- IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}"
+ defp msg_callback(state, {ts, nick, msg}) do
+ for pid <- state.subs do
+ if Process.alive?(pid) do
+ send(pid, {:chat_recv, state.channel, {ts, nick, msg}})
+ end
+ end
end
defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex
index c281d57..8928040 100644
--- a/lib/cli/cli.ex
+++ b/lib/cli/cli.ex
@@ -8,6 +8,8 @@ defmodule SCLI do
end
defp run(pid) do
+ handle_messages()
+
nick = Shard.Identity.get_nickname
prompt = case pid do
nil -> "(no channel) #{nick}: "
@@ -25,11 +27,25 @@ defmodule SCLI do
pid2 = handle_command(pid, command)
run(pid2)
true ->
- GenServer.cast(pid, {:chat_send, str})
+ if str != "" do
+ GenServer.cast(pid, {:chat_send, str})
+ end
run(pid)
end
end
+ defp handle_messages() do
+ receive do
+ {:chat_recv, chan, {ts, nick, msg}} ->
+ IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}"
+ handle_messages()
+ {:chat_send, _, _} ->
+ # do nothing
+ handle_messages()
+ after 10 -> nil
+ end
+ end
+
defp handle_command(pid, ["connect", ipstr, portstr]) do
{:ok, ip} = :inet.parse_address (to_charlist ipstr)
{port, _} = Integer.parse portstr
@@ -47,6 +63,19 @@ defmodule SCLI do
pid
end
+ defp handle_command(pid, ["hist"]) do
+ case GenServer.call(pid, {:read_history, nil, 100}) do
+ {:ok, list, _rest} ->
+ list
+ |> Enum.reverse
+ |> Enum.each(fn {ts, nick, msg} ->
+ IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}"
+ end)
+ _ -> nil
+ end
+ pid
+ end
+
defp handle_command(_pid, ["join", qchan]) do
list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db),
{:chat, chan} = manifest,
@@ -54,6 +83,7 @@ defmodule SCLI do
case List.keyfind(list, qchan, 0) do
nil ->
{:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan})
+ GenServer.cast(pid, {:subscribe, self()})
pid
{_, pid} ->
IO.puts "Switching to ##{qchan}"
diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex
index 71483bd..357f336 100644
--- a/lib/data/merklelist.ex
+++ b/lib/data/merklelist.ex
@@ -78,7 +78,7 @@ defmodule SData.MerkleList do
callback.(item)
new_state
:duplicate -> insert_many_aux(state, rest, callback)
- :before -> push(insert_many_aux(state_rest, [item | rest], callback), item)
+ :before -> push(insert_many_aux(state_rest, [item | rest], callback), front)
end
end
end
diff --git a/lib/manager.ex b/lib/manager.ex
index 45aae5f..87f95c5 100644
--- a/lib/manager.ex
+++ b/lib/manager.ex
@@ -28,6 +28,8 @@ defmodule Shard.Manager do
use GenServer
+ require Logger
+
def start_link(my_port) do
GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
end
@@ -96,7 +98,9 @@ defmodule Shard.Manager do
add_peer(ip, port, state)
currtime = System.os_time :second
:ets.insert(state.outbox, {peer_id, msg, currtime})
- outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end)
+ outbox_cleanup = [{{:_, :_, :'$1'},
+ [{:<, :'$1', currtime - 60}],
+ [:'$1']}]
:ets.select_delete(state.outbox, outbox_cleanup)
_ -> nil
end
@@ -121,9 +125,13 @@ defmodule Shard.Manager do
defp add_peer(ip, port, state) do
spawn fn ->
- {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
- :ok = :gen_tcp.controlling_process(client, pid)
+ case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do
+ {:ok, client} ->
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
+ :ok = :gen_tcp.controlling_process(client, pid)
+ _ ->
+ Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped"
+ end
end
end