aboutsummaryrefslogtreecommitdiff
path: root/lib/manager.ex
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-07-19 17:08:23 +0200
committerAlex Auvolat <alex@adnab.me>2018-07-19 17:08:23 +0200
commit058bab0d7097405126566360308ace986c18ff8e (patch)
treeeaf3ca0d607829af3ad07bdb51bb170b70f8eef5 /lib/manager.ex
parent582f1d65463f8f5cbcc34c6129670b473793c4dd (diff)
downloadshard-058bab0d7097405126566360308ace986c18ff8e.tar.gz
shard-058bab0d7097405126566360308ace986c18ff8e.zip
Refactoring ; template for block store
Diffstat (limited to 'lib/manager.ex')
-rw-r--r--lib/manager.ex146
1 files changed, 122 insertions, 24 deletions
diff --git a/lib/manager.ex b/lib/manager.ex
index ce11117..07295fa 100644
--- a/lib/manager.ex
+++ b/lib/manager.ex
@@ -1,46 +1,144 @@
defmodule Shard.Manager do
+ @moduledoc"""
+ Maintains three tables :
+
+ - :peer_db
+
+ List of
+ { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen }
+
+ - :shard_db
+
+ List of
+ { id, manifest, shard_pid }
+
+ - :peer_shard_db
+
+ Mult-list of
+ { peer_id, shard_id }
+
+ """
+
use GenServer
- def start_link(_) do
- GenServer.start_link(__MODULE__, nil, name: __MODULE__)
+ def start_link(my_port) do
+ GenServer.start_link(__MODULE__, my_port, name: __MODULE__)
end
- def init(_) do
- state = %{
- shards: %{}
- }
- {:ok, state}
+ def init(my_port) do
+ :ets.new(:peer_db, [:set, :protected, :named_table])
+ :ets.new(:shard_db, [:set, :protected, :named_table])
+ peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private])
+ outbox = :ets.new(:outbox, [:bag, :private])
+
+ {:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} }
end
def handle_call({:find, shard_id}, _from, state) do
- {:reply, state.shards[shard_id], state}
+ reply = case :ets.lookup(:shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] -> {:ok, pid}
+ [] -> :not_found
+ end
+ {:reply, reply, state}
+ end
+
+ def handle_cast({:register, shard_id, manifest, pid}, state) do
+ will_live = case :ets.lookup(:shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] -> not Process.alive?(pid)
+ _ -> true
+ end
+ if will_live do
+ :ets.insert(:shard_db, {shard_id, manifest, pid})
+ else
+ GenServer.cast(pid, {:redundant, shard_id})
+ end
+ {:noreply, state}
end
- def handle_call(:list, _from, state) do
- {:reply, Map.to_list(state.shards), state}
+ def handle_cast({:interested, peer_id, shards}, state) do
+ for shard_id <- shards do
+ case :ets.lookup(:shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] ->
+ :ets.insert(state.peer_shard_db, {peer_id, shard_id})
+ GenServer.cast(pid, {:interested, peer_id})
+ [] -> nil
+ end
+ end
+ {:noreply, state}
end
- def handle_cast({:register, shard_id, pid}, state) do
- if Map.has_key?(state.shards, shard_id) do
- GenServer.cast(pid, {:redundant, shard_id})
- state
- else
- new_shards = Map.put(state.shards, shard_id, pid)
- {:noreply, %{ state | shards: new_shards }}
+
+
+ def handle_cast({:peer_up, pk, pid, ip, port}, state) do
+ :ets.insert(:peer_db, {pk, pid, ip, port})
+ for {_, msg, _} <- :ets.lookup(state.outbox, pk) do
+ GenServer.cast(pid, {:send_msg, msg})
end
+ :ets.delete(state.outbox, pk)
+ {:noreply, state}
+ end
+
+ def handle_cast({:peer_down, pk, ip, port}, state) do
+ :ets.insert(:peer_db, {pk, nil, ip, port})
+ {:noreply, state}
end
- def handle_cast({:interested, peer_id, peer_pid, shards}, state) do
- shards
- |> Enum.filter(&(Map.has_key?(state.shards, &1)))
- |> Enum.each(&(GenServer.cast(state.shards[&1], {:interested, peer_id, peer_pid})))
+ def handle_cast({:connect_and_send, peer_id, msg}, state) do
+ case :ets.lookup(:peer_db, peer_id) do
+ [{^peer_id, nil, ip, port}] ->
+ add_peer(ip, port, state)
+ currtime = System.os_time :second
+ :ets.insert(state.outbox, {peer_id, msg, currtime})
+ outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end)
+ :ets.select_delete(state.outbox, outbox_cleanup)
+ _ -> nil
+ end
{:noreply, state}
end
- def handle_cast({:dispatch, peer_id, peer_pid, shard, msg}, state) do
- if Map.has_key?(state.shards, shard) do
- GenServer.cast(state.shards[shard], {:msg, peer_id, peer_pid, msg})
+ def handle_cast({:try_connect, pk_list}, state) do
+ for pk <- pk_list do
+ case :ets.lookup(:peer_db, pk) do
+ [{^pk, nil, ip, port}] ->
+ add_peer(ip, port, state)
+ _ -> nil
+ end
end
{:noreply, state}
end
+
+ def handle_cast({:add_peer, ip, port}, state) do
+ add_peer(ip, port, state)
+ {:noreply, state}
+ end
+
+ defp add_peer(ip, port, state) do
+ spawn fn ->
+ {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false])
+ {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}})
+ :ok = :gen_tcp.controlling_process(client, pid)
+ end
+ end
+
+ def add_peer(ip, port) do
+ GenServer.cast(__MODULE__, {:add_peer, ip, port})
+ end
+
+ def send(peer_id, msg) do
+ case :ets.lookup(:peer_db, peer_id) do
+ [{ ^peer_id, pid, _, _}] when pid != nil->
+ GenServer.cast(pid, {:send_msg, msg})
+ _ ->
+ GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg})
+ end
+ end
+
+ def dispatch(peer_id, shard_id, msg) do
+ case :ets.lookup(:shard_db, shard_id) do
+ [{ ^shard_id, _, pid }] when pid != nil ->
+ GenServer.cast(pid, {:msg, peer_id, msg})
+ [_] -> nil # TODO restart shard
+ [] -> nil # TODO send not interested
+ end
+ end
end