diff options
author | Alex Auvolat <alex@adnab.me> | 2018-08-27 16:19:53 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-08-27 16:19:53 +0200 |
commit | 233989490b0b01670b03154f9e8f83be13e5a89a (patch) | |
tree | 9a98641848fc57f85ce2c451911d4030c05153f9 /lib | |
parent | 6cc81b55f2466cd7526f47da6980e3eb47041457 (diff) | |
download | shard-233989490b0b01670b03154f9e8f83be13e5a89a.tar.gz shard-233989490b0b01670b03154f9e8f83be13e5a89a.zip |
Big fixes (1 line), small changes (many lines)
Diffstat (limited to 'lib')
-rw-r--r-- | lib/app/chat.ex | 38 | ||||
-rw-r--r-- | lib/cli/cli.ex | 32 | ||||
-rw-r--r-- | lib/data/merklelist.ex | 2 | ||||
-rw-r--r-- | lib/manager.ex | 16 |
4 files changed, 77 insertions, 11 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex index 42991ce..a9cfb1e 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -39,7 +39,15 @@ defmodule SApp.Chat do GenServer.cast(Shard.Manager, {:register, id, manifest, self()}) GenServer.cast(self(), :init_pull) - {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: MapSet.new}} + {:ok, + %{channel: channel, + id: id, + manifest: manifest, + store: store, + peers: MapSet.new, + subs: MapSet.new, + } + } end @doc """ @@ -49,6 +57,11 @@ defmodule SApp.Chat do {:reply, state.manifest, state} end + def handle_call({:read_history, start, num}, _from, state) do + ret = ML.read(state.store, start, num) + {:reply, ret, state} + end + @doc """ Implementation of the :redundant handler: if another process is already synchronizing this channel then we exit. @@ -80,6 +93,12 @@ defmodule SApp.Chat do msg} new_state = %{state | store: ML.insert(state.store, msgitem)} + for pid <- state.subs do + if Process.alive?(pid) do + send(pid, {:chat_send, state.channel, msgitem}) + end + end + for peer <- state.peers do push_messages(new_state, peer, nil, 5) end @@ -97,6 +116,11 @@ defmodule SApp.Chat do {:noreply, %{ state | peers: new_peers }} end + def handle_cast({:subscribe, pid}, state) do + new_subs = MapSet.put(state.subs, pid) + {:noreply, %{ state | subs: new_subs }} + end + @doc """ Implementation of the :msg handler, which is the main handler for messages comming from other peers concerning this chat room. @@ -110,7 +134,7 @@ defmodule SApp.Chat do case msg do {:get_manifest} -> Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}}) - {:get, start} -> push_messages(peer_id, state, start, 20) + {:get, start} -> push_messages(state, peer_id, start, 20) {:info, _start, list, rest} -> if rest != nil and not ML.has(state.store, rest) do Shard.Manager.send(peer_id, {state.id, {:get, rest}}) @@ -131,7 +155,7 @@ defmodule SApp.Chat do end def handle_cast({:deferred_insert, list}, state) do - new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state.channel, msg) end)) + new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state, msg) end)) {:noreply, %{state | store: new_store}} end @@ -143,8 +167,12 @@ defmodule SApp.Chat do end end - defp msg_callback(chan, {ts, nick, msg}) do - IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}" + defp msg_callback(state, {ts, nick, msg}) do + for pid <- state.subs do + if Process.alive?(pid) do + send(pid, {:chat_recv, state.channel, {ts, nick, msg}}) + end + end end defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex index c281d57..8928040 100644 --- a/lib/cli/cli.ex +++ b/lib/cli/cli.ex @@ -8,6 +8,8 @@ defmodule SCLI do end defp run(pid) do + handle_messages() + nick = Shard.Identity.get_nickname prompt = case pid do nil -> "(no channel) #{nick}: " @@ -25,11 +27,25 @@ defmodule SCLI do pid2 = handle_command(pid, command) run(pid2) true -> - GenServer.cast(pid, {:chat_send, str}) + if str != "" do + GenServer.cast(pid, {:chat_send, str}) + end run(pid) end end + defp handle_messages() do + receive do + {:chat_recv, chan, {ts, nick, msg}} -> + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}" + handle_messages() + {:chat_send, _, _} -> + # do nothing + handle_messages() + after 10 -> nil + end + end + defp handle_command(pid, ["connect", ipstr, portstr]) do {:ok, ip} = :inet.parse_address (to_charlist ipstr) {port, _} = Integer.parse portstr @@ -47,6 +63,19 @@ defmodule SCLI do pid end + defp handle_command(pid, ["hist"]) do + case GenServer.call(pid, {:read_history, nil, 100}) do + {:ok, list, _rest} -> + list + |> Enum.reverse + |> Enum.each(fn {ts, nick, msg} -> + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" + end) + _ -> nil + end + pid + end + defp handle_command(_pid, ["join", qchan]) do list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db), {:chat, chan} = manifest, @@ -54,6 +83,7 @@ defmodule SCLI do case List.keyfind(list, qchan, 0) do nil -> {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan}) + GenServer.cast(pid, {:subscribe, self()}) pid {_, pid} -> IO.puts "Switching to ##{qchan}" diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex index 71483bd..357f336 100644 --- a/lib/data/merklelist.ex +++ b/lib/data/merklelist.ex @@ -78,7 +78,7 @@ defmodule SData.MerkleList do callback.(item) new_state :duplicate -> insert_many_aux(state, rest, callback) - :before -> push(insert_many_aux(state_rest, [item | rest], callback), item) + :before -> push(insert_many_aux(state_rest, [item | rest], callback), front) end end end diff --git a/lib/manager.ex b/lib/manager.ex index 45aae5f..87f95c5 100644 --- a/lib/manager.ex +++ b/lib/manager.ex @@ -28,6 +28,8 @@ defmodule Shard.Manager do use GenServer + require Logger + def start_link(my_port) do GenServer.start_link(__MODULE__, my_port, name: __MODULE__) end @@ -96,7 +98,9 @@ defmodule Shard.Manager do add_peer(ip, port, state) currtime = System.os_time :second :ets.insert(state.outbox, {peer_id, msg, currtime}) - outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end) + outbox_cleanup = [{{:_, :_, :'$1'}, + [{:<, :'$1', currtime - 60}], + [:'$1']}] :ets.select_delete(state.outbox, outbox_cleanup) _ -> nil end @@ -121,9 +125,13 @@ defmodule Shard.Manager do defp add_peer(ip, port, state) do spawn fn -> - {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) - :ok = :gen_tcp.controlling_process(client, pid) + case :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) do + {:ok, client} -> + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) + :ok = :gen_tcp.controlling_process(client, pid) + _ -> + Logger.info "Could not connect to #{inspect ip}:#{port}, some messages may be dropped" + end end end |