From 058bab0d7097405126566360308ace986c18ff8e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 19 Jul 2018 17:08:23 +0200 Subject: Refactoring ; template for block store --- lib/app/blockstore.ex_ | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++ lib/app/chat.ex | 37 +++++++++++------------- 2 files changed, 95 insertions(+), 20 deletions(-) create mode 100644 lib/app/blockstore.ex_ (limited to 'lib/app') diff --git a/lib/app/blockstore.ex_ b/lib/app/blockstore.ex_ new file mode 100644 index 0000000..2854161 --- /dev/null +++ b/lib/app/blockstore.ex_ @@ -0,0 +1,78 @@ +defmodule SApp.BlockStore do + @moduledoc """ + A module that implements a content-adressable storage (blocks, or pages, + identified by the hash of their contents). + + Establishes full node connectivity and uses rendez-vous hashing to select + which nodes are responsible of a given hash. + + TODO: WIP + """ + + use GenServer + + defmodule State do + defstruct [:name, :id, :manifest, + :ncopies, + :store, :peers] + end + + + def start_link(name) do + GenServer.start_link(__MODULE__, name) + end + + def init(name) do + manifest = {:blockstore, name} + id = SData.term_hash manifest + + GenServer.cast(Shard.Manager, {:register, id, self()}) + GenServer.cast(self(), :init_pull) + + {:ok, %State{name: name, id: id, manifest: manifest, + ncopies: 3, + store: %{}, peers: %{}}} + end + + def handle_call(:manifest, _from, state) do + {:reply, state.manifest, state} + end + + def handle_call({:get, key}, from, state) do + # TODO + end + + def handle_call({:put, val}, state) do + # TODO + end + + def handle_cast({:redundant, _}, _state) do + exit :normal + end + + def handle_cast(:init_pull, state) do + GenServer.call(SNet.Manager, :get_all) + |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}}))) + {:noreply, state} + end + + def handle_cast({:interested, peer_id, peer_pid}, state) do + new_peers = Map.put(state.peers, peer_id, peer_pid) + new_state = %{ state | peers: new_peers } + initial_sync(new_state, peer_id, peer_pid) + {:noreply, new_state} + end + + def handle_cast({:msg, peer_id, peer_pid, msg}, state) do + # TODO + {:noreply, state} + end + + defp initial_sync(state, peer_id, peer_pid) do + # TODO + end + + defp send(state, to, msg) do + GenServer.cast(to, {:send_msg, {state.id, msg}}) + end +end diff --git a/lib/app/chat.ex b/lib/app/chat.ex index fe2777e..696e53c 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -34,10 +34,10 @@ defmodule SApp.Chat do manifest = {:chat, channel} id = SData.term_hash manifest - GenServer.cast(Shard.Manager, {:register, id, self()}) + GenServer.cast(Shard.Manager, {:register, id, manifest, self()}) GenServer.cast(self(), :init_pull) - {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: %{}}} + {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: MapSet.new}} end @doc """ @@ -61,8 +61,9 @@ defmodule SApp.Chat do send data for this channel if they have some. """ def handle_cast(:init_pull, state) do - GenServer.call(SNet.Manager, :get_all) - |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}}))) + for {_, pid, _, _} <- :ets.tab2list(:peer_db) do + GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) + end {:noreply, state} end @@ -77,8 +78,8 @@ defmodule SApp.Chat do msg} GenServer.cast(state.store, {:insert, msgitem}) - for {_, pid} <- state.peers do - push_messages(state, pid, nil, 5) + for peer <- state.peers do + push_messages(state, peer, nil, 5) end {:noreply, state} @@ -88,9 +89,9 @@ defmodule SApp.Chat do Implementation of the :interested handler, this is called when a peer we are connected to asks to recieve data for this channel. """ - def handle_cast({:interested, peer_id, peer_pid}, state) do - push_messages(state, peer_pid, nil, 10) - new_peers = Map.put(state.peers, peer_id, peer_pid) + def handle_cast({:interested, peer_id}, state) do + push_messages(state, peer_id, nil, 10) + new_peers = MapSet.put(state.peers, peer_id) {:noreply, %{ state | peers: new_peers }} end @@ -103,13 +104,14 @@ defmodule SApp.Chat do - `{:info, start, list, rest}`: put some messages and informs of the Merkle hash of the store of older messages. """ - def handle_cast({:msg, peer_id, peer_pid, msg}, state) do + def handle_cast({:msg, peer_id, msg}, state) do case msg do - {:get_manifest} -> send(state, peer_pid, {:manifest, state.manifest}) + {:get_manifest} -> + Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}}) {:get, start} -> push_messages(peer_id, state, start, 20) {:info, _start, list, rest} -> if rest != nil and not GenServer.call(state.store, {:has, rest}) do - send(state, peer_pid, {:get, rest}) + Shard.Manager.send(peer_id, {state.id, {:get, rest}}) end spawn_link(fn -> Process.sleep 1000 @@ -118,22 +120,17 @@ defmodule SApp.Chat do _ -> nil end - if Map.has_key?(state.peers, peer_id) do + if MapSet.member?(state.peers, peer_id) do {:noreply, state} else - handle_cast({:interested, peer_id, peer_pid}, state) + handle_cast({:interested, peer_id}, state) end end - defp send(state, to, msg) do - GenServer.cast(to, {:send_msg, {state.id, msg}}) - end - - defp push_messages(state, to, start, num) do case GenServer.call(state.store, {:read, start, num}) do {:ok, list, rest} -> - send(state, to, {:info, start, list, rest}) + Shard.Manager.send(to, {state.id, {:info, start, list, rest}}) _ -> nil end end -- cgit v1.2.3