diff options
author | Alex Auvolat <alex@adnab.me> | 2018-07-19 17:08:23 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-07-19 17:08:23 +0200 |
commit | 058bab0d7097405126566360308ace986c18ff8e (patch) | |
tree | eaf3ca0d607829af3ad07bdb51bb170b70f8eef5 | |
parent | 582f1d65463f8f5cbcc34c6129670b473793c4dd (diff) | |
download | shard-058bab0d7097405126566360308ace986c18ff8e.tar.gz shard-058bab0d7097405126566360308ace986c18ff8e.zip |
Refactoring ; template for block store
-rw-r--r-- | lib/app/blockstore.ex_ | 78 | ||||
-rw-r--r-- | lib/app/chat.ex | 37 | ||||
-rw-r--r-- | lib/application.ex | 1 | ||||
-rw-r--r-- | lib/cli/cli.ex | 13 | ||||
-rw-r--r-- | lib/manager.ex | 146 | ||||
-rw-r--r-- | lib/net/manager.ex | 65 | ||||
-rw-r--r-- | lib/net/tcpconn.ex | 10 | ||||
-rw-r--r-- | lib/web/httprouter.ex | 2 |
8 files changed, 229 insertions, 123 deletions
diff --git a/lib/app/blockstore.ex_ b/lib/app/blockstore.ex_ new file mode 100644 index 0000000..2854161 --- /dev/null +++ b/lib/app/blockstore.ex_ @@ -0,0 +1,78 @@ +defmodule SApp.BlockStore do + @moduledoc """ + A module that implements a content-adressable storage (blocks, or pages, + identified by the hash of their contents). + + Establishes full node connectivity and uses rendez-vous hashing to select + which nodes are responsible of a given hash. + + TODO: WIP + """ + + use GenServer + + defmodule State do + defstruct [:name, :id, :manifest, + :ncopies, + :store, :peers] + end + + + def start_link(name) do + GenServer.start_link(__MODULE__, name) + end + + def init(name) do + manifest = {:blockstore, name} + id = SData.term_hash manifest + + GenServer.cast(Shard.Manager, {:register, id, self()}) + GenServer.cast(self(), :init_pull) + + {:ok, %State{name: name, id: id, manifest: manifest, + ncopies: 3, + store: %{}, peers: %{}}} + end + + def handle_call(:manifest, _from, state) do + {:reply, state.manifest, state} + end + + def handle_call({:get, key}, from, state) do + # TODO + end + + def handle_call({:put, val}, state) do + # TODO + end + + def handle_cast({:redundant, _}, _state) do + exit :normal + end + + def handle_cast(:init_pull, state) do + GenServer.call(SNet.Manager, :get_all) + |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}}))) + {:noreply, state} + end + + def handle_cast({:interested, peer_id, peer_pid}, state) do + new_peers = Map.put(state.peers, peer_id, peer_pid) + new_state = %{ state | peers: new_peers } + initial_sync(new_state, peer_id, peer_pid) + {:noreply, new_state} + end + + def handle_cast({:msg, peer_id, peer_pid, msg}, state) do + # TODO + {:noreply, state} + end + + defp initial_sync(state, peer_id, peer_pid) do + # TODO + end + + defp send(state, to, msg) do + GenServer.cast(to, {:send_msg, {state.id, msg}}) + end +end diff --git a/lib/app/chat.ex b/lib/app/chat.ex index fe2777e..696e53c 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -34,10 +34,10 @@ defmodule SApp.Chat do manifest = {:chat, channel} id = SData.term_hash manifest - GenServer.cast(Shard.Manager, {:register, id, self()}) + GenServer.cast(Shard.Manager, {:register, id, manifest, self()}) GenServer.cast(self(), :init_pull) - {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: %{}}} + {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: MapSet.new}} end @doc """ @@ -61,8 +61,9 @@ defmodule SApp.Chat do send data for this channel if they have some. """ def handle_cast(:init_pull, state) do - GenServer.call(SNet.Manager, :get_all) - |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}}))) + for {_, pid, _, _} <- :ets.tab2list(:peer_db) do + GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) + end {:noreply, state} end @@ -77,8 +78,8 @@ defmodule SApp.Chat do msg} GenServer.cast(state.store, {:insert, msgitem}) - for {_, pid} <- state.peers do - push_messages(state, pid, nil, 5) + for peer <- state.peers do + push_messages(state, peer, nil, 5) end {:noreply, state} @@ -88,9 +89,9 @@ defmodule SApp.Chat do Implementation of the :interested handler, this is called when a peer we are connected to asks to recieve data for this channel. """ - def handle_cast({:interested, peer_id, peer_pid}, state) do - push_messages(state, peer_pid, nil, 10) - new_peers = Map.put(state.peers, peer_id, peer_pid) + def handle_cast({:interested, peer_id}, state) do + push_messages(state, peer_id, nil, 10) + new_peers = MapSet.put(state.peers, peer_id) {:noreply, %{ state | peers: new_peers }} end @@ -103,13 +104,14 @@ defmodule SApp.Chat do - `{:info, start, list, rest}`: put some messages and informs of the Merkle hash of the store of older messages. """ - def handle_cast({:msg, peer_id, peer_pid, msg}, state) do + def handle_cast({:msg, peer_id, msg}, state) do case msg do - {:get_manifest} -> send(state, peer_pid, {:manifest, state.manifest}) + {:get_manifest} -> + Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}}) {:get, start} -> push_messages(peer_id, state, start, 20) {:info, _start, list, rest} -> if rest != nil and not GenServer.call(state.store, {:has, rest}) do - send(state, peer_pid, {:get, rest}) + Shard.Manager.send(peer_id, {state.id, {:get, rest}}) end spawn_link(fn -> Process.sleep 1000 @@ -118,22 +120,17 @@ defmodule SApp.Chat do _ -> nil end - if Map.has_key?(state.peers, peer_id) do + if MapSet.member?(state.peers, peer_id) do {:noreply, state} else - handle_cast({:interested, peer_id, peer_pid}, state) + handle_cast({:interested, peer_id}, state) end end - defp send(state, to, msg) do - GenServer.cast(to, {:send_msg, {state.id, msg}}) - end - - defp push_messages(state, to, start, num) do case GenServer.call(state.store, {:read, start, num}) do {:ok, list, rest} -> - send(state, to, {:info, start, list, rest}) + Shard.Manager.send(to, {state.id, {:info, start, list, rest}}) _ -> nil end end diff --git a/lib/application.ex b/lib/application.ex index 3ba6e12..5262677 100644 --- a/lib/application.ex +++ b/lib/application.ex @@ -19,7 +19,6 @@ defmodule Shard.Application do { DynamicSupervisor, strategy: :one_for_one, name: Shard.DynamicSupervisor }, # Networking - { SNet.Manager, listen_port }, { SNet.TCPServer, listen_port }, # Applications & data store diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex index d2e5f6a..3f83cb1 100644 --- a/lib/cli/cli.ex +++ b/lib/cli/cli.ex @@ -33,24 +33,23 @@ defmodule SCLI do defp handle_command(pid, ["connect", ipstr, portstr]) do {:ok, ip} = :inet.parse_address (to_charlist ipstr) {port, _} = Integer.parse portstr - SNet.Manager.add_peer(ip, port) + Shard.Manager.add_peer(ip, port) pid end defp handle_command(pid, ["list"]) do IO.puts "List of known channels:" - list = GenServer.call(Shard.Manager, :list) - for {_chid, chpid} <- list do - {:chat, chan} = GenServer.call(chpid, :manifest) + + for {_chid, manifest, _chpid} <- :ets.tab2list(:shard_db) do + {:chat, chan} = manifest IO.puts "##{chan}" end pid end defp handle_command(pid, ["join", qchan]) do - list = GenServer.call(Shard.Manager, :list) - list = for {_chid, chpid} <- list, - {:chat, chan} = GenServer.call(chpid, :manifest), + list = for {_chid, manifest, chpid} <- :ets.tab2list(:shard_db), + {:chat, chan} = manifest, do: {chan, chpid} case List.keyfind(list, qchan, 0) do nil -> diff --git a/lib/manager.ex b/lib/manager.ex index ce11117..07295fa 100644 --- a/lib/manager.ex +++ b/lib/manager.ex @@ -1,46 +1,144 @@ defmodule Shard.Manager do + @moduledoc""" + Maintains three tables : + + - :peer_db + + List of + { id, {conn_pid, con_start, conn_n_msg} | nil, ip, port, last_seen } + + - :shard_db + + List of + { id, manifest, shard_pid } + + - :peer_shard_db + + Mult-list of + { peer_id, shard_id } + + """ + use GenServer - def start_link(_) do - GenServer.start_link(__MODULE__, nil, name: __MODULE__) + def start_link(my_port) do + GenServer.start_link(__MODULE__, my_port, name: __MODULE__) end - def init(_) do - state = %{ - shards: %{} - } - {:ok, state} + def init(my_port) do + :ets.new(:peer_db, [:set, :protected, :named_table]) + :ets.new(:shard_db, [:set, :protected, :named_table]) + peer_shard_db = :ets.new(:peer_shard_db, [:bag, :private]) + outbox = :ets.new(:outbox, [:bag, :private]) + + {:ok, %{my_port: my_port, peer_shard_db: peer_shard_db, outbox: outbox} } end def handle_call({:find, shard_id}, _from, state) do - {:reply, state.shards[shard_id], state} + reply = case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] -> {:ok, pid} + [] -> :not_found + end + {:reply, reply, state} + end + + def handle_cast({:register, shard_id, manifest, pid}, state) do + will_live = case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] -> not Process.alive?(pid) + _ -> true + end + if will_live do + :ets.insert(:shard_db, {shard_id, manifest, pid}) + else + GenServer.cast(pid, {:redundant, shard_id}) + end + {:noreply, state} end - def handle_call(:list, _from, state) do - {:reply, Map.to_list(state.shards), state} + def handle_cast({:interested, peer_id, shards}, state) do + for shard_id <- shards do + case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] -> + :ets.insert(state.peer_shard_db, {peer_id, shard_id}) + GenServer.cast(pid, {:interested, peer_id}) + [] -> nil + end + end + {:noreply, state} end - def handle_cast({:register, shard_id, pid}, state) do - if Map.has_key?(state.shards, shard_id) do - GenServer.cast(pid, {:redundant, shard_id}) - state - else - new_shards = Map.put(state.shards, shard_id, pid) - {:noreply, %{ state | shards: new_shards }} + + + def handle_cast({:peer_up, pk, pid, ip, port}, state) do + :ets.insert(:peer_db, {pk, pid, ip, port}) + for {_, msg, _} <- :ets.lookup(state.outbox, pk) do + GenServer.cast(pid, {:send_msg, msg}) end + :ets.delete(state.outbox, pk) + {:noreply, state} + end + + def handle_cast({:peer_down, pk, ip, port}, state) do + :ets.insert(:peer_db, {pk, nil, ip, port}) + {:noreply, state} end - def handle_cast({:interested, peer_id, peer_pid, shards}, state) do - shards - |> Enum.filter(&(Map.has_key?(state.shards, &1))) - |> Enum.each(&(GenServer.cast(state.shards[&1], {:interested, peer_id, peer_pid}))) + def handle_cast({:connect_and_send, peer_id, msg}, state) do + case :ets.lookup(:peer_db, peer_id) do + [{^peer_id, nil, ip, port}] -> + add_peer(ip, port, state) + currtime = System.os_time :second + :ets.insert(state.outbox, {peer_id, msg, currtime}) + outbox_cleanup = :ets.fun2ms(fn {_, _, t} when t < currtime - 60 -> true end) + :ets.select_delete(state.outbox, outbox_cleanup) + _ -> nil + end {:noreply, state} end - def handle_cast({:dispatch, peer_id, peer_pid, shard, msg}, state) do - if Map.has_key?(state.shards, shard) do - GenServer.cast(state.shards[shard], {:msg, peer_id, peer_pid, msg}) + def handle_cast({:try_connect, pk_list}, state) do + for pk <- pk_list do + case :ets.lookup(:peer_db, pk) do + [{^pk, nil, ip, port}] -> + add_peer(ip, port, state) + _ -> nil + end end {:noreply, state} end + + def handle_cast({:add_peer, ip, port}, state) do + add_peer(ip, port, state) + {:noreply, state} + end + + defp add_peer(ip, port, state) do + spawn fn -> + {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) + {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: state.my_port}}) + :ok = :gen_tcp.controlling_process(client, pid) + end + end + + def add_peer(ip, port) do + GenServer.cast(__MODULE__, {:add_peer, ip, port}) + end + + def send(peer_id, msg) do + case :ets.lookup(:peer_db, peer_id) do + [{ ^peer_id, pid, _, _}] when pid != nil-> + GenServer.cast(pid, {:send_msg, msg}) + _ -> + GenServer.cast(__MODULE__, {:connect_and_send, peer_id, msg}) + end + end + + def dispatch(peer_id, shard_id, msg) do + case :ets.lookup(:shard_db, shard_id) do + [{ ^shard_id, _, pid }] when pid != nil -> + GenServer.cast(pid, {:msg, peer_id, msg}) + [_] -> nil # TODO restart shard + [] -> nil # TODO send not interested + end + end end diff --git a/lib/net/manager.ex b/lib/net/manager.ex deleted file mode 100644 index 4b1ce94..0000000 --- a/lib/net/manager.ex +++ /dev/null @@ -1,65 +0,0 @@ -defmodule SNet.Manager do - use GenServer - - def start_link(my_port) do - GenServer.start_link(__MODULE__, my_port, name: __MODULE__) - end - - def init(my_port) do - state = %{ - peers: %{}, - my_port: my_port - } - {:ok, state} - end - - def handle_cast({:peer_up, pk, pid, addr, ip}, state) do - new_peers = Map.put(state.peers, pk, {pid, addr, ip}) - new_state = %{ state | peers: new_peers } - {:noreply, new_state} - end - - def handle_cast({:peer_down, pk, addr, ip}, state) do - new_peers = Map.put(state.peers, pk, {nil, addr, ip}) - new_state = %{ state | peers: new_peers } - {:noreply, new_state} - end - - def handle_cast({:add_peer, ip, port}, state) do - add_peer(ip, port, state.my_port) - {:noreply, state} - end - - def handle_cast({:try_connect, pk_list}, state) do - for pk <- pk_list do - case state.peers[pk] do - {nil, ip, port} -> add_peer(ip, port) - _ -> nil - end - end - {:noreply, state} - end - - def handle_call(:get_all, _from, state) do - pid_list = (for {_, {pid, _, _}} <- state.peers, pid != nil, do: pid) - {:reply, pid_list, state} - end - - def handle_call({:get_connections, pk_list}, _from, state) do - pid_list = (for pk <- pk_list, Map.has_key?(state.peers, pk), do: state.peers[pk]) - |> Enum.map(fn {pid, _, _} -> pid end) - |> Enum.filter(&(&1 != nil)) - {:ok, pid_list, state} - end - - def add_peer(ip, port) do - GenServer.cast(__MODULE__, {:add_peer, ip, port}) - end - - def add_peer(ip, port, my_port) do - {:ok, client} = :gen_tcp.connect(ip, port, [:binary, packet: 2, active: false]) - {:ok, pid} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SNet.TCPConn, %{socket: client, my_port: my_port}}) - :ok = :gen_tcp.controlling_process(client, pid) - pid - end -end diff --git a/lib/net/tcpconn.ex b/lib/net/tcpconn.ex index 64c85e9..a16a62e 100644 --- a/lib/net/tcpconn.ex +++ b/lib/net/tcpconn.ex @@ -53,7 +53,7 @@ defmodule SNet.TCPConn do addr: addr, port: port } - GenServer.cast(SNet.Manager, {:peer_up, cli_pkey, self(), addr, his_port}) + GenServer.cast(Shard.Manager, {:peer_up, cli_pkey, self(), addr, his_port}) Logger.info "New peer: #{print_id state} at #{inspect addr}:#{port}" GenServer.cast(self(), :init_pull) @@ -66,7 +66,7 @@ defmodule SNet.TCPConn do end def handle_cast(:init_pull, state) do - id_list = (for {id, _} <- GenServer.call(Shard.Manager, :list), do: id) + id_list = (for {id, _, _} <- :ets.tab2list(:shard_db), do: id) send_msg(state, {:interested, id_list}) {:noreply, state} end @@ -98,16 +98,16 @@ defmodule SNet.TCPConn do def handle_info({:tcp_closed, _socket}, state) do Logger.info "Disconnected: #{print_id state} at #{inspect state.addr}:#{state.port}" - GenServer.cast(SNet.Manager, {:peer_down, state.his_pkey, state.addr, state.port}) + GenServer.cast(Shard.Manager, {:peer_down, state.his_pkey, state.addr, state.port}) exit(:normal) end defp handle_packet({:interested, shards}, state) do - GenServer.cast(Shard.Manager, {:interested, state.his_pkey, self(), shards}) + GenServer.cast(Shard.Manager, {:interested, state.his_pkey, shards}) end defp handle_packet({shard, msg}, state) do - GenServer.cast(Shard.Manager, {:dispatch, state.his_pkey, self(), shard, msg}) + Shard.Manager.dispatch(state.his_pkey, shard, msg) end defp print_id(state) do diff --git a/lib/web/httprouter.ex b/lib/web/httprouter.ex index d37413d..57af9f9 100644 --- a/lib/web/httprouter.ex +++ b/lib/web/httprouter.ex @@ -22,7 +22,7 @@ defmodule SWeb.HTTPRouter do [ipstr, portstr] = String.split(conn.params["peer"], ":") {:ok, ip} = :inet.parse_address (to_charlist ipstr) {port, _} = Integer.parse portstr - SNet.Manager.add_peer(ip, port) + Shard.Manager.add_peer(ip, port) end main_page(conn) |