aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/app/blockstore.ex_78
-rw-r--r--lib/app/chat.ex37
-rw-r--r--lib/application.ex1
-rw-r--r--lib/cli/cli.ex13
-rw-r--r--lib/manager.ex146
-rw-r--r--lib/net/manager.ex65
-rw-r--r--lib/net/tcpconn.ex10
-rw-r--r--lib/web/httprouter.ex2
8 files changed, 229 insertions, 123 deletions
diff --git a/lib/app/blockstore.ex_ b/lib/app/blockstore.ex_
new file mode 100644
index 0000000..2854161
--- /dev/null
+++ b/lib/app/blockstore.ex_
@@ -0,0 +1,78 @@
+defmodule SApp.BlockStore do
+ @moduledoc """
+ A module that implements a content-adressable storage (blocks, or pages,
+ identified by the hash of their contents).
+
+ Establishes full node connectivity and uses rendez-vous hashing to select
+ which nodes are responsible of a given hash.
+
+ TODO: WIP
+ """
+
+ use GenServer
+
+ defmodule State do
+ defstruct [:name, :id, :manifest,
+ :ncopies,
+ :store, :peers]
+ end
+
+
+ def start_link(name) do
+ GenServer.start_link(__MODULE__, name)
+ end
+
+ def init(name) do
+ manifest = {:blockstore, name}
+ id = SData.term_hash manifest
+
+ GenServer.cast(Shard.Manager, {:register, id, self()})
+ GenServer.cast(self(), :init_pull)
+
+ {:ok, %State{name: name, id: id, manifest: manifest,
+ ncopies: 3,
+ store: %{}, peers: %{}}}
+ end
+
+ def handle_call(:manifest, _from, state) do
+ {:reply, state.manifest, state}
+ end
+
+ def handle_call({:get, key}, from, state) do
+ # TODO
+ end
+
+ def handle_call({:put, val}, state) do
+ # TODO
+ end
+
+ def handle_cast({:redundant, _}, _state) do
+ exit :normal
+ end
+
+ def handle_cast(:init_pull, state) do
+ GenServer.call(SNet.Manager, :get_all)
+ |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}})))
+ {:noreply, state}
+ end
+
+ def handle_cast({:interested, peer_id, peer_pid}, state) do
+ new_peers = Map.put(state.peers, peer_id, peer_pid)
+ new_state = %{ state | peers: new_peers }
+ initial_sync(new_state, peer_id, peer_pid)
+ {:noreply, new_state}
+ end
+
+ def handle_cast({:msg, peer_id, peer_pid, msg}, state) do
+ # TODO
+ {:noreply, state}
+ end
+
+ defp initial_sync(state, peer_id, peer_pid) do
+ # TODO
+ end
+
+ defp send(state, to, msg) do
+ GenServer.cast(to, {:send_msg, {state.id, msg}})
+ end
+end
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
index fe2777e..696e53c 100644
--- a/lib/app/chat.ex
+++ b/lib/app/chat.ex
@@ -34,10 +34,10 @@ defmodule SApp.Chat do
manifest = {:chat, channel}
id = SData.term_hash manifest
- GenServer.cast(Shard.Manager, {:register, id, self()})
+ GenServer.cast(Shard.Manager, {:register, id, manifest, self()})
GenServer.cast(self(), :init_pull)
- {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: %{}}}
+ {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: MapSet.new}}
end
@doc """
@@ -61,8 +61,9 @@ defmodule SApp.Chat do
send data for this channel if they have some.
"""
def handle_cast(:init_pull, state) do
- GenServer.call(SNet.Manager, :get_all)
- |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}})))
+ for {_, pid, _, _} <- :ets.tab2list(:peer_db) do
+ GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
+ end
{:noreply, state}
end
@@ -77,8 +78,8 @@ defmodule SApp.Chat do
msg}
GenServer.cast(state.store, {:insert, msgitem})
- for {_, pid} <- state.peers do
- push_messages(state, pid, nil, 5)
+ for peer <- state.peers do
+ push_messages(state, peer, nil, 5)
end
{:noreply, state}
@@ -88,9 +89,9 @@ defmodule SApp.Chat do
Implementation of the :interested handler, this is called when a peer we are
connected to asks to recieve data for this channel.
"""
- def handle_cast({:interested, peer_id, peer_pid}, state) do
- push_messages(state, peer_pid, nil, 10)
- new_peers = Map.put(state.peers, peer_id, peer_pid)
+ def handle_cast({:interested, peer_id}, state) do
+ push_messages(state, peer_id, nil, 10)
+ new_peers = MapSet.put(state.peers, peer_id)
{:noreply, %{ state | peers: new_peers }}
end
@@ -103,13 +104,14 @@ defmodule SApp.Chat do
- `{:info, start, list, rest}`: put some messages and informs of the
Merkle hash of the store of older messages.
"""
- def handle_cast({:msg, peer_id, peer_pid, msg}, state) do
+ def handle_cast({:msg, peer_id, msg}, state) do
case msg do
- {:get_manifest} -> send(state, peer_pid, {:manifest, state.manifest})
+ {:get_manifest} ->
+ Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}})
{:get, start} -> push_messages(peer_id, state, start, 20)
{:info, _start, list, rest} ->
if rest != nil and not GenServer.call(state.store, {:has, rest}) do
- send(state, peer_pid, {:get, rest})
+ Shard.Manager.send(peer_id, {state.id, {:get, rest}})
end
spawn_link(fn ->
Process.sleep 1000
@@ -118,22 +120,17 @@ defmodule SApp.Chat do
_ -> nil
end
- if Map.has_key?(state.peers, peer_id) do
+ if MapSet.member?(state.peers, peer_id) do
{:noreply, state}
else
- handle_cast({:interested, peer_id, peer_pid}, state)
+ handle_cast({:interested, peer_id}, state)
end
end
- defp send(state, to, msg) do
- GenServer.cast(to, {:send_msg, {state.id, msg}})
- end
-
-
defp push_messages(state, to, start, num) do
case GenServer.call(state.store, {:read, start, num}) do
{:ok, list, rest} ->
- send(state, to, {:info, start, list, rest})
+ Shard.Manager.send(to, {state.id, {:info, start, list, rest}})
_ -> nil
end
end
diff --git a/lib/application.ex b/lib/application.ex
index 3ba6e12..5262677 100644
--- a/lib/application.ex
+++ b/lib/application.ex
@@ -19,7 +19,6 @@ defmodule Shard.Application do
{ DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor },
# Networking
- { SNet.Manager, listen_port },
{ SNet.TCPServer, listen_port },
# Applications & data store
diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex
index d2e5f6a..3f83cb1 100644
--- a/lib/cli/cli.ex
+++ b/lib/cli/cli.ex
@@ -33,24 +33,23 @@ defmodule SCLI do
defp handle_command(pid, ["connect", ipstr, portstr]) do
{:ok, ip} = :inet.parse_address (to_charlist ipstr)
{port, _} = Integer.parse portstr
- SNet.Manager.add_peer(ip, port)
+ Shard.Manager.add_peer(ip, port)
pid
end
defp handle_command(pid, ["list"]) do
IO.puts "List of known channels:"
- list = GenServer.call(Shard.Manager, :list)
- for {_chid, chpid} <- list do
- {:chat, chan} = GenServer.call(chpid, :manifest)
+
+ for {_chid, manifest, _chpid} <- :ets.tab2list(:shard_db) do
+ {:chat, chan} = manifest
IO.puts "##{chan}"
end
pid
end
defp handle_command(pid, ["join", qchan]) do
- list = GenServer.call(Shard.Manager, :list)
- list = for {_chid, chpid} <- list,
- {:chat, chan} = GenServer.call(chpid, :manifest),
+ list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db),
+ {:chat, chan} = manifest,
do: {chan, chpid}
case List.keyfind(list, qchan, 0) do
nil ->
diff --git a/lib/manager.ex b/lib/manager.ex
index ce11117..07295fa 100644
--- a/lib/manager.ex
+++ b/lib/manager.ex
@@ -1,46 +1,144 @@
defmodule Shard.Manager do
+ @moduledoc"""
+ Maintains three tables :
+
+ - :peer_db
+
+ List of
+ { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }
+
+ - :shard_db
+
+ List of
+ { id, manifest, shard_pid }
+
+ - :peer_shard_db
+
+ Mult-list of
+ { peer_id, shard_id }
+
+ """
+
use GenServer
- def start_link(_) do
- GenServer.start_link(__MODULE__, nil, name: __MODULE__)
+ def start_link(my_port) do
+ GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
end
- def init(_) do
- state = %{
- shards: %{}
- }
- {:ok, state}
+ def init(my_port) do
+ :ets.new(:peer_db, [:set, :protected, :named_table])
+ :ets.new(:shard_db, [:set, :protected, :named_table])
+ peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private])
+ outbox = :ets.new(:outbox, [:bag, :private])
+
+ {:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} }
end
def handle_call({:find, shard_id}, _from, state) do
- {:reply, state.shards[shard_id], state}
+ reply = case :ets.lookup(:shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] -> {:ok, pid}
+ [] -> :not_found
+ end
+ {:reply, reply, state}
+ end
+
+ def handle_cast({:register, shard_id, manifest, pid}, state) do
+ will_live = case :ets.lookup(:shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] -> not Process.alive?(pid)
+ _ -> true
+ end
+ if will_live do
+ :ets.insert(:shard_db, {shard_id, manifest, pid})
+ else
+ GenServer.cast(pid, {:redundant, shard_id})
+ end
+ {:noreply, state}
end
- def handle_call(:list, _from, state) do
- {:reply, Map.to_list(state.shards), state}
+ def handle_cast({:interested, peer_id, shards}, state) do
+ for shard_id <- shards do
+ case :ets.lookup(:shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] ->
+ :ets.insert(state.peer_shard_db, {peer_id, shard_id})
+ GenServer.cast(pid, {:interested, peer_id})
+ [] -> nil
+ end
+ end
+ {:noreply, state}
end
- def handle_cast({:register, shard_id, pid}, state) do
- if Map.has_key?(state.shards, shard_id) do
- GenServer.cast(pid, {:redundant, shard_id})
- state
- else
- new_shards = Map.put(state.shards, shard_id, pid)
- {:noreply, %{ state | shards: new_shards }}
+
+
+ def handle_cast({:peer_up, pk, pid, ip, port}, state) do
+ :ets.insert(:peer_db, {pk, pid, ip, port})
+ for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
+ GenServer.cast(pid, {:send_msg, msg})
end
+ :ets.delete(state.outbox, pk)
+ {:noreply, state}
+ end
+
+ def handle_cast({:peer_down, pk, ip, port}, state) do
+ :ets.insert(:peer_db, {pk, nil, ip, port})
+ {:noreply, state}
end
- def handle_cast({:interested, peer_id, peer_pid, shards}, state) do
- shards
- |> Enum.filter(&(Map.has_key?(state.shards, &1)))
- |> Enum.each(&(GenServer.cast(state.shards[&1], {:interested, peer_id, peer_pid})))
+ def handle_cast({:connect_and_send, peer_id, msg}, state) do
+ case :ets.lookup(:peer_db, peer_id) do
+ [{^peer_id, nil, ip, port}] ->
+ add_peer(ip, port, state)
+ currtime = System.os_time :second
+ :ets.insert(state.outbox, {peer_id, msg, currtime})
+ outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end)
+ :ets.select_delete(state.outbox, outbox_cleanup)
+ _ -> nil
+ end
{:noreply, state}
end
- def handle_cast({:dispatch, peer_id, peer_pid, shard, msg}, state) do
- if Map.has_key?(state.shards, shard) do
- GenServer.cast(state.shards[shard], {:msg, peer_id, peer_pid, msg})
+ def handle_cast({:try_connect, pk_list}, state) do
+ for pk <- pk_list do
+ case :ets.lookup(:peer_db, pk) do
+ [{^pk, nil, ip, port}] ->
+ add_peer(ip, port, state)
+ _ -> nil
+ end
end
{:noreply, state}
end
+
+ def handle_cast({:add_peer, ip, port}, state) do
+ add_peer(ip, port, state)
+ {:noreply, state}
+ end
+
+ defp add_peer(ip, port, state) do
+ spawn fn ->
+ {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
+ :ok = :gen_tcp.controlling_process(client, pid)
+ end
+ end
+
+ def add_peer(ip, port) do
+ GenServer.cast(__MODULE__, {:add_peer, ip, port})
+ end
+
+ def send(peer_id, msg) do
+ case :ets.lookup(:peer_db, peer_id) do
+ [{ ^peer_id, pid, _, _}] when pid != nil->
+ GenServer.cast(pid, {:send_msg, msg})
+ _ ->
+ GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
+ end
+ end
+
+ def dispatch(peer_id, shard_id, msg) do
+ case :ets.lookup(:shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] when pid != nil ->
+ GenServer.cast(pid, {:msg, peer_id, msg})
+ [_] -> nil # TODO restart shard
+ [] -> nil # TODO send not interested
+ end
+ end
end
diff --git a/lib/net/manager.ex b/lib/net/manager.ex
deleted file mode 100644
index 4b1ce94..0000000
--- a/lib/net/manager.ex
+++ /dev/null
@@ -1,65 +0,0 @@
-defmodule SNet.Manager do
- use GenServer
-
- def start_link(my_port) do
- GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
- end
-
- def init(my_port) do
- state = %{
- peers: %{},
- my_port: my_port
- }
- {:ok, state}
- end
-
- def handle_cast({:peer_up, pk, pid, addr, ip}, state) do
- new_peers = Map.put(state.peers, pk, {pid, addr, ip})
- new_state = %{ state | peers: new_peers }
- {:noreply, new_state}
- end
-
- def handle_cast({:peer_down, pk, addr, ip}, state) do
- new_peers = Map.put(state.peers, pk, {nil, addr, ip})
- new_state = %{ state | peers: new_peers }
- {:noreply, new_state}
- end
-
- def handle_cast({:add_peer, ip, port}, state) do
- add_peer(ip, port, state.my_port)
- {:noreply, state}
- end
-
- def handle_cast({:try_connect, pk_list}, state) do
- for pk <- pk_list do
- case state.peers[pk] do
- {nil, ip, port} -> add_peer(ip, port)
- _ -> nil
- end
- end
- {:noreply, state}
- end
-
- def handle_call(:get_all, _from, state) do
- pid_list = (for {_, {pid, _, _}} <- state.peers, pid != nil, do: pid)
- {:reply, pid_list, state}
- end
-
- def handle_call({:get_connections, pk_list}, _from, state) do
- pid_list = (for pk <- pk_list, Map.has_key?(state.peers, pk), do: state.peers[pk])
- |> Enum.map(fn {pid, _, _} -> pid end)
- |> Enum.filter(&(&1 != nil))
- {:ok, pid_list, state}
- end
-
- def add_peer(ip, port) do
- GenServer.cast(__MODULE__, {:add_peer, ip, port})
- end
-
- def add_peer(ip, port, my_port) do
- {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
- {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}})
- :ok = :gen_tcp.controlling_process(client, pid)
- pid
- end
-end
diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex
index 64c85e9..a16a62e 100644
--- a/lib/net/tcpconn.ex
+++ b/lib/net/tcpconn.ex
@@ -53,7 +53,7 @@ defmodule SNet.TCPConn do
addr: addr,
port: port
}
- GenServer.cast(SNet.Manager, {:peer_up, cli_pkey, self(), addr, his_port})
+ GenServer.cast(Shard.Manager, {:peer_up, cli_pkey, self(), addr, his_port})
Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}"
GenServer.cast(self(), :init_pull)
@@ -66,7 +66,7 @@ defmodule SNet.TCPConn do
end
def handle_cast(:init_pull, state) do
- id_list = (for {id, _} <- GenServer.call(Shard.Manager, :list), do: id)
+ id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id)
send_msg(state, {:interested, id_list})
{:noreply, state}
end
@@ -98,16 +98,16 @@ defmodule SNet.TCPConn do
def handle_info({:tcp_closed, _socket}, state) do
Logger.info "Disconnected: #{print_id state} at #{inspect state.addr}:#{state.port}"
- GenServer.cast(SNet.Manager, {:peer_down, state.his_pkey, state.addr, state.port})
+ GenServer.cast(Shard.Manager, {:peer_down, state.his_pkey, state.addr, state.port})
exit(:normal)
end
defp handle_packet({:interested, shards}, state) do
- GenServer.cast(Shard.Manager, {:interested, state.his_pkey, self(), shards})
+ GenServer.cast(Shard.Manager, {:interested, state.his_pkey, shards})
end
defp handle_packet({shard, msg}, state) do
- GenServer.cast(Shard.Manager, {:dispatch, state.his_pkey, self(), shard, msg})
+ Shard.Manager.dispatch(state.his_pkey, shard, msg)
end
defp print_id(state) do
diff --git a/lib/web/httprouter.ex b/lib/web/httprouter.ex
index d37413d..57af9f9 100644
--- a/lib/web/httprouter.ex
+++ b/lib/web/httprouter.ex
@@ -22,7 +22,7 @@ defmodule SWeb.HTTPRouter do
[ipstr, portstr] = String.split(conn.params["peer"], ":")
{:ok, ip} = :inet.parse_address (to_charlist ipstr)
{port, _} = Integer.parse portstr
- SNet.Manager.add_peer(ip, port)
+ Shard.Manager.add_peer(ip, port)
end
main_page(conn)