aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/app/chat.ex83
-rw-r--r--lib/application.ex4
-rw-r--r--lib/cli/cli.ex56
-rw-r--r--lib/data/merklelist.ex4
-rw-r--r--lib/manager.ex46
-rw-r--r--lib/net/manager.ex9
-rw-r--r--lib/net/tcpconn.ex34
-rw-r--r--lib/net/tcpserver.ex2
8 files changed, 191 insertions, 47 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
index 4a56085..f165514 100644
--- a/lib/app/chat.ex
+++ b/lib/app/chat.ex
@@ -1,17 +1,86 @@
defmodule SApp.Chat do
- def send(msg) do
+ use GenServer
+
+ def start_link(channel) do
+ GenServer.start_link(__MODULE__, channel)
+ end
+
+ def init(channel) do
+ {:ok, store} = SData.MerkleList.start_link(&msg_cmp/2)
+ manifest = {:chat, channel}
+ id = :crypto.hash(:sha256, :erlang.term_to_binary(manifest))
+
+ GenServer.cast(Shard.Manager, {:register, id, self()})
+ GenServer.cast(self(), :init_pull)
+
+ {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: %{}}}
+ end
+
+ def handle_call(:manifest, _from, state) do
+ {:reply, state.manifest, state}
+ end
+
+ def handle_cast({:redundant, _}, _state) do
+ exit :normal
+ end
+
+ def handle_cast(:init_pull, state) do
+ GenServer.call(SNet.Manager, :get_all)
+ |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}})))
+ {:noreply, state}
+ end
+
+ def handle_cast({:chat_send, msg}, state) do
msgitem = {(System.os_time :seconds),
Shard.Identity.get_nickname(),
msg}
- GenServer.cast(SApp.Chat.Log, {:insert, msgitem})
+ GenServer.cast(state.store, {:insert, msgitem})
+
+ for {_, pid} <- state.peers do
+ push_messages(state, pid, nil, 5)
+ end
- SNet.ConnSupervisor
- |> DynamicSupervisor.which_children
- |> Enum.each(fn {_, pid, _, _} -> GenServer.cast(pid, :init_push) end)
+ {:noreply, state}
end
- def msg_callback({ts, nick, msg}) do
- IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}"
+ def handle_cast({:interested, peer_id, peer_pid}, state) do
+ push_messages(state, peer_pid, nil, 10)
+ new_peers = Map.put(state.peers, peer_id, peer_pid)
+ {:noreply, %{ state | peers: new_peers }}
+ end
+
+ def handle_cast({:msg, peer_id, peer_pid, msg}, state) do
+ case msg do
+ :get_top -> push_messages(peer_id, state, nil, 10)
+ {:get, start} -> push_messages(peer_id, state, start, 20)
+ {:info, _start, list, rest} ->
+ if rest != nil and not GenServer.call(state.store, {:has, rest}) do
+ GenServer.cast(peer_pid, {:send_msg, {state.id, {:get, rest}}})
+ end
+ spawn_link(fn ->
+ Process.sleep 1000
+ GenServer.cast(state.store, {:insert_many, list, (fn msg -> msg_callback(state.channel, msg) end)})
+ end)
+ end
+
+ if Map.has_key?(state.peers, peer_id) do
+ {:noreply, state}
+ else
+ handle_cast({:interested, peer_id, peer_pid}, state)
+ end
+ end
+
+ defp push_messages(state, to, start, num) do
+ case GenServer.call(state.store, {:read, start, num}) do
+ {:ok, list, rest} ->
+ GenServer.cast(to, {:send_msg, {state.id, {:info, start, list, rest}}})
+ _ -> nil
+ end
+ end
+
+
+ def msg_callback(chan, {ts, nick, msg}) do
+ IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} ##{chan} <#{nick}> #{msg}"
end
def msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
diff --git a/lib/application.ex b/lib/application.ex
index 3ad9325..e375cf4 100644
--- a/lib/application.ex
+++ b/lib/application.ex
@@ -13,14 +13,14 @@ defmodule Shard.Application do
# Define workers and child supervisors to be supervised
children = [
Shard.Identity,
+ { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor },
# Networking
{ SNet.Manager, listen_port },
{ SNet.TCPServer, listen_port },
- { DynamicSupervisor, strategy: :one_for_one, name: SNet.ConnSupervisor },
# Applications & data store
- { SData.MerkleList, [&SApp.Chat.msg_cmp/2, name: SApp.Chat.Log] },
+ Shard.Manager,
# Web UI
Plug.Adapters.Cowboy.child_spec(:http, SWeb.HTTPRouter, [], port: listen_port + 1000)
diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex
index 28ef5d0..0402b3f 100644
--- a/lib/cli/cli.ex
+++ b/lib/cli/cli.ex
@@ -1,30 +1,70 @@
defmodule SCLI do
def run() do
- str = "say: " |> IO.gets |> String.trim
+ run(nil)
+ end
+
+ defp run(pid) do
+ nick = Shard.Identity.get_nickname
+ prompt = case pid do
+ nil -> "(no channel) #{nick}: "
+ _ ->
+ {:chat, chan} = GenServer.call(pid, :manifest)
+ "##{chan} #{nick}: "
+ end
+
+ str = prompt |> IO.gets |> String.trim
cond do
str == "/quit" ->
nil
String.slice(str, 0..0) == "/" ->
command = str |> String.slice(1..-1) |> String.split(" ")
- handle_command(command)
- run()
+ pid2 = handle_command(pid, command)
+ run(pid2)
true ->
- SApp.Chat.send(str)
- run()
+ GenServer.cast(pid, {:chat_send, str})
+ run(pid)
end
end
- def handle_command(["connect", ipstr, portstr]) do
+ def handle_command(pid, ["connect", ipstr, portstr]) do
{:ok, ip} = :inet.parse_address (to_charlist ipstr)
{port, _} = Integer.parse portstr
SNet.Manager.add_peer(ip, port)
+ pid
+ end
+
+ def handle_command(pid, ["list"]) do
+ IO.puts "List of known channels:"
+ list = GenServer.call(Shard.Manager, :list)
+ for {_chid, chpid} <- list do
+ {:chat, chan} = GenServer.call(chpid, :manifest)
+ IO.puts "##{chan}"
+ end
+ pid
+ end
+
+ def handle_command(pid, ["join", qchan]) do
+ list = GenServer.call(Shard.Manager, :list)
+ list = for {_chid, chpid} <- list,
+ {:chat, chan} = GenServer.call(chpid, :manifest),
+ do: {chan, chpid}
+ case List.keyfind(list, qchan, 0) do
+ nil ->
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, qchan})
+ pid
+ {_, pid} ->
+ IO.puts "Switching to ##{qchan}"
+ pid
+ end
end
- def handle_command(["nick", nick]) do
+ def handle_command(pid, ["nick", nick]) do
Shard.Identity.set_nickname nick
+ pid
end
- def handle_command(_cmd) do
+ def handle_command(pid, _cmd) do
IO.puts "Invalid command"
+ pid
end
end
diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex
index c9e27f6..42b80fc 100644
--- a/lib/data/merklelist.ex
+++ b/lib/data/merklelist.ex
@@ -1,8 +1,8 @@
defmodule SData.MerkleList do
use GenServer
- def start_link([cmp, name: name]) do
- GenServer.start_link(__MODULE__, cmp, [name: name])
+ def start_link(cmp) do
+ GenServer.start_link(__MODULE__, cmp)
end
defp term_hash(term) do
diff --git a/lib/manager.ex b/lib/manager.ex
new file mode 100644
index 0000000..ce11117
--- /dev/null
+++ b/lib/manager.ex
@@ -0,0 +1,46 @@
+defmodule Shard.Manager do
+ use GenServer
+
+ def start_link(_) do
+ GenServer.start_link(__MODULE__, nil, name: __MODULE__)
+ end
+
+ def init(_) do
+ state = %{
+ shards: %{}
+ }
+ {:ok, state}
+ end
+
+ def handle_call({:find, shard_id}, _from, state) do
+ {:reply, state.shards[shard_id], state}
+ end
+
+ def handle_call(:list, _from, state) do
+ {:reply, Map.to_list(state.shards), state}
+ end
+
+ def handle_cast({:register, shard_id, pid}, state) do
+ if Map.has_key?(state.shards, shard_id) do
+ GenServer.cast(pid, {:redundant, shard_id})
+ state
+ else
+ new_shards = Map.put(state.shards, shard_id, pid)
+ {:noreply, %{ state | shards: new_shards }}
+ end
+ end
+
+ def handle_cast({:interested, peer_id, peer_pid, shards}, state) do
+ shards
+ |> Enum.filter(&(Map.has_key?(state.shards, &1)))
+ |> Enum.each(&(GenServer.cast(state.shards[&1], {:interested, peer_id, peer_pid})))
+ {:noreply, state}
+ end
+
+ def handle_cast({:dispatch, peer_id, peer_pid, shard, msg}, state) do
+ if Map.has_key?(state.shards, shard) do
+ GenServer.cast(state.shards[shard], {:msg, peer_id, peer_pid, msg})
+ end
+ {:noreply, state}
+ end
+end
diff --git a/lib/net/manager.ex b/lib/net/manager.ex
index e5eb12d..4b1ce94 100644
--- a/lib/net/manager.ex
+++ b/lib/net/manager.ex
@@ -40,7 +40,12 @@ defmodule SNet.Manager do
{:noreply, state}
end
- def handle_call({:get_connections, pk_list}, state) do
+ def handle_call(:get_all, _from, state) do
+ pid_list = (for {_, {pid, _, _}} <- state.peers, pid != nil, do: pid)
+ {:reply, pid_list, state}
+ end
+
+ def handle_call({:get_connections, pk_list}, _from, state) do
pid_list = (for pk <- pk_list, Map.has_key?(state.peers, pk), do: state.peers[pk])
|> Enum.map(fn {pid, _, _} -> pid end)
|> Enum.filter(&(&1 != nil))
@@ -53,7 +58,7 @@ defmodule SNet.Manager do
def add_peer(ip, port, my_port) do
{:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
- {:ok, pid} = DynamicSupervisor.start_child(SNet.ConnSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
:ok = :gen_tcp.controlling_process(client, pid)
pid
end
diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex
index 301e931..64c85e9 100644
--- a/lib/net/tcpconn.ex
+++ b/lib/net/tcpconn.ex
@@ -53,10 +53,9 @@ defmodule SNet.TCPConn do
addr: addr,
port: port
}
- GenServer.cast(SNet.Manager, {:peer_up, self(), cli_pkey, addr, his_port})
+ GenServer.cast(SNet.Manager, {:peer_up, cli_pkey, self(), addr, his_port})
Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
-
- GenServer.cast(self(), :init_push)
+ GenServer.cast(self(), :init_pull)
{:noreply, state}
end
@@ -66,8 +65,9 @@ defmodule SNet.TCPConn do
{:noreply, state}
end
- def handle_cast(:init_push, state) do
- push_messages(state, nil, 10)
+ def handle_cast(:init_pull, state) do
+ id_list = (for {id, _} <- GenServer.call(Shard.Manager, :list), do: id)
+ send_msg(state, {:interested, id_list})
{:noreply, state}
end
@@ -102,28 +102,12 @@ defmodule SNet.TCPConn do
exit(:normal)
end
- defp push_messages(state, start, num) do
- case GenServer.call(SApp.Chat.Log, {:read, start, num}) do
- {:ok, list, rest} ->
- send_msg(state, {:info, start, list, rest})
- _ -> nil
- end
+ defp handle_packet({:interested, shards}, state) do
+ GenServer.cast(Shard.Manager, {:interested, state.his_pkey, self(), shards})
end
- defp handle_packet(msg, state) do
- # Logger.info "Message: #{inspect msg}"
- case msg do
- :get_top -> push_messages(state, nil, 10)
- {:get, start} -> push_messages(state, start, 20)
- {:info, _start, list, rest} ->
- if rest != nil and not GenServer.call(SApp.Chat.Log, {:has, rest}) do
- send_msg(state, {:get, rest})
- end
- spawn_link(fn ->
- Process.sleep 1000
- GenServer.cast(SApp.Chat.Log, {:insert_many, list, &SApp.Chat.msg_callback/1})
- end)
- end
+ defp handle_packet({shard, msg}, state) do
+ GenServer.cast(Shard.Manager, {:dispatch, state.his_pkey, self(), shard, msg})
end
defp print_id(state) do
diff --git a/lib/net/tcpserver.ex b/lib/net/tcpserver.ex
index 7c758c1..46552a4 100644
--- a/lib/net/tcpserver.ex
+++ b/lib/net/tcpserver.ex
@@ -18,7 +18,7 @@ defmodule SNet.TCPServer do
defp loop_acceptor(socket, my_port) do
{:ok, client} = :gen_tcp.accept(socket)
- {:ok, pid} = DynamicSupervisor.start_child(SNet.ConnSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket, my_port)
end