diff options
author | Alex Auvolat <alex@adnab.me> | 2018-07-19 17:08:23 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-07-19 17:08:23 +0200 |
commit | 058bab0d7097405126566360308ace986c18ff8e (patch) | |
tree | eaf3ca0d607829af3ad07bdb51bb170b70f8eef5 /lib/manager.ex | |
parent | 582f1d65463f8f5cbcc34c6129670b473793c4dd (diff) | |
download | shard-058bab0d7097405126566360308ace986c18ff8e.tar.gz shard-058bab0d7097405126566360308ace986c18ff8e.zip |
Refactoring ; template for block store
Diffstat (limited to 'lib/manager.ex')
-rw-r--r-- | lib/manager.ex | 146 |
1 files changed, 122 insertions, 24 deletions
diff --git a/lib/manager.ex b/lib/manager.ex index ce11117..07295fa 100644 --- a/lib/manager.ex +++ b/lib/manager.ex @@ -1,46 +1,144 @@ defmodule Shard.Manager do + @moduledoc""" + Maintains three tables : + + - :peer_db + + List of + { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen } + + - :shard_db + + List of + { id, manifest, shard_pid } + + - :peer_shard_db + + Mult-list of + { peer_id, shard_id } + + """ + use GenServer - def start_link(_) do - GenServer.start_link(__MODULE__, nil, name: __MODULE__) + def start_link(my_port) do + GenServer.start_link(__MODULE__, my_port, name: __MODULE__) end - def init(_) do - state = %{ - shards: %{} - } - {:ok, state} + def init(my_port) do + :ets.new(:peer_db, [:set, :protected, :named_table]) + :ets.new(:shard_db, [:set, :protected, :named_table]) + peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private]) + outbox = :ets.new(:outbox, [:bag, :private]) + + {:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} } end def handle_call({:find, shard_id}, _from, state) do - {:reply, state.shards[shard_id], state} + reply = case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] -> {:ok, pid} + [] -> :not_found + end + {:reply, reply, state} + end + + def handle_cast({:register, shard_id, manifest, pid}, state) do + will_live = case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] -> not Process.alive?(pid) + _ -> true + end + if will_live do + :ets.insert(:shard_db, {shard_id, manifest, pid}) + else + GenServer.cast(pid, {:redundant, shard_id}) + end + {:noreply, state} end - def handle_call(:list, _from, state) do - {:reply, Map.to_list(state.shards), state} + def handle_cast({:interested, peer_id, shards}, state) do + for shard_id <- shards do + case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] -> + :ets.insert(state.peer_shard_db, {peer_id, shard_id}) + GenServer.cast(pid, {:interested, peer_id}) + [] -> nil + end + end + {:noreply, state} end - def handle_cast({:register, shard_id, pid}, state) do - if Map.has_key?(state.shards, shard_id) do - GenServer.cast(pid, {:redundant, shard_id}) - state - else - new_shards = Map.put(state.shards, shard_id, pid) - {:noreply, %{ state | shards: new_shards }} + + + def handle_cast({:peer_up, pk, pid, ip, port}, state) do + :ets.insert(:peer_db, {pk, pid, ip, port}) + for {_, msg, _} <- :ets.lookup(state.outbox, pk) do + GenServer.cast(pid, {:send_msg, msg}) end + :ets.delete(state.outbox, pk) + {:noreply, state} + end + + def handle_cast({:peer_down, pk, ip, port}, state) do + :ets.insert(:peer_db, {pk, nil, ip, port}) + {:noreply, state} end - def handle_cast({:interested, peer_id, peer_pid, shards}, state) do - shards - |> Enum.filter(&(Map.has_key?(state.shards, &1))) - |> Enum.each(&(GenServer.cast(state.shards[&1], {:interested, peer_id, peer_pid}))) + def handle_cast({:connect_and_send, peer_id, msg}, state) do + case :ets.lookup(:peer_db, peer_id) do + [{^peer_id, nil, ip, port}] -> + add_peer(ip, port, state) + currtime = System.os_time :second + :ets.insert(state.outbox, {peer_id, msg, currtime}) + outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end) + :ets.select_delete(state.outbox, outbox_cleanup) + _ -> nil + end {:noreply, state} end - def handle_cast({:dispatch, peer_id, peer_pid, shard, msg}, state) do - if Map.has_key?(state.shards, shard) do - GenServer.cast(state.shards[shard], {:msg, peer_id, peer_pid, msg}) + def handle_cast({:try_connect, pk_list}, state) do + for pk <- pk_list do + case :ets.lookup(:peer_db, pk) do + [{^pk, nil, ip, port}] -> + add_peer(ip, port, state) + _ -> nil + end end {:noreply, state} end + + def handle_cast({:add_peer, ip, port}, state) do + add_peer(ip, port, state) + {:noreply, state} + end + + defp add_peer(ip, port, state) do + spawn fn -> + {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) + :ok = :gen_tcp.controlling_process(client, pid) + end + end + + def add_peer(ip, port) do + GenServer.cast(__MODULE__, {:add_peer, ip, port}) + end + + def send(peer_id, msg) do + case :ets.lookup(:peer_db, peer_id) do + [{ ^peer_id, pid, _, _}] when pid != nil-> + GenServer.cast(pid, {:send_msg, msg}) + _ -> + GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) + end + end + + def dispatch(peer_id, shard_id, msg) do + case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] when pid != nil -> + GenServer.cast(pid, {:msg, peer_id, msg}) + [_] -> nil # TODO restart shard + [] -> nil # TODO send not interested + end + end end |