aboutsummaryrefslogtreecommitdiff
path: root/lib/app
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-07-19 17:08:23 +0200
committerAlex Auvolat <alex@adnab.me>2018-07-19 17:08:23 +0200
commit058bab0d7097405126566360308ace986c18ff8e (patch)
treeeaf3ca0d607829af3ad07bdb51bb170b70f8eef5 /lib/app
parent582f1d65463f8f5cbcc34c6129670b473793c4dd (diff)
downloadshard-058bab0d7097405126566360308ace986c18ff8e.tar.gz
shard-058bab0d7097405126566360308ace986c18ff8e.zip
Refactoring ; template for block store
Diffstat (limited to 'lib/app')
-rw-r--r--lib/app/blockstore.ex_78
-rw-r--r--lib/app/chat.ex37
2 files changed, 95 insertions, 20 deletions
diff --git a/lib/app/blockstore.ex_ b/lib/app/blockstore.ex_
new file mode 100644
index 0000000..2854161
--- /dev/null
+++ b/lib/app/blockstore.ex_
@@ -0,0 +1,78 @@
+defmodule SApp.BlockStore do
+ @moduledoc """
+ A module that implements a content-adressable storage (blocks, or pages,
+ identified by the hash of their contents).
+
+ Establishes full node connectivity and uses rendez-vous hashing to select
+ which nodes are responsible of a given hash.
+
+ TODO: WIP
+ """
+
+ use GenServer
+
+ defmodule State do
+ defstruct [:name, :id, :manifest,
+ :ncopies,
+ :store, :peers]
+ end
+
+
+ def start_link(name) do
+ GenServer.start_link(__MODULE__, name)
+ end
+
+ def init(name) do
+ manifest = {:blockstore, name}
+ id = SData.term_hash manifest
+
+ GenServer.cast(Shard.Manager, {:register, id, self()})
+ GenServer.cast(self(), :init_pull)
+
+ {:ok, %State{name: name, id: id, manifest: manifest,
+ ncopies: 3,
+ store: %{}, peers: %{}}}
+ end
+
+ def handle_call(:manifest, _from, state) do
+ {:reply, state.manifest, state}
+ end
+
+ def handle_call({:get, key}, from, state) do
+ # TODO
+ end
+
+ def handle_call({:put, val}, state) do
+ # TODO
+ end
+
+ def handle_cast({:redundant, _}, _state) do
+ exit :normal
+ end
+
+ def handle_cast(:init_pull, state) do
+ GenServer.call(SNet.Manager, :get_all)
+ |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}})))
+ {:noreply, state}
+ end
+
+ def handle_cast({:interested, peer_id, peer_pid}, state) do
+ new_peers = Map.put(state.peers, peer_id, peer_pid)
+ new_state = %{ state | peers: new_peers }
+ initial_sync(new_state, peer_id, peer_pid)
+ {:noreply, new_state}
+ end
+
+ def handle_cast({:msg, peer_id, peer_pid, msg}, state) do
+ # TODO
+ {:noreply, state}
+ end
+
+ defp initial_sync(state, peer_id, peer_pid) do
+ # TODO
+ end
+
+ defp send(state, to, msg) do
+ GenServer.cast(to, {:send_msg, {state.id, msg}})
+ end
+end
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
index fe2777e..696e53c 100644
--- a/lib/app/chat.ex
+++ b/lib/app/chat.ex
@@ -34,10 +34,10 @@ defmodule SApp.Chat do
manifest = {:chat, channel}
id = SData.term_hash manifest
- GenServer.cast(Shard.Manager, {:register, id, self()})
+ GenServer.cast(Shard.Manager, {:register, id, manifest, self()})
GenServer.cast(self(), :init_pull)
- {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: %{}}}
+ {:ok, %{channel: channel, id: id, manifest: manifest, store: store, peers: MapSet.new}}
end
@doc """
@@ -61,8 +61,9 @@ defmodule SApp.Chat do
send data for this channel if they have some.
"""
def handle_cast(:init_pull, state) do
- GenServer.call(SNet.Manager, :get_all)
- |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}})))
+ for {_, pid, _, _} <- :ets.tab2list(:peer_db) do
+ GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
+ end
{:noreply, state}
end
@@ -77,8 +78,8 @@ defmodule SApp.Chat do
msg}
GenServer.cast(state.store, {:insert, msgitem})
- for {_, pid} <- state.peers do
- push_messages(state, pid, nil, 5)
+ for peer <- state.peers do
+ push_messages(state, peer, nil, 5)
end
{:noreply, state}
@@ -88,9 +89,9 @@ defmodule SApp.Chat do
Implementation of the :interested handler, this is called when a peer we are
connected to asks to recieve data for this channel.
"""
- def handle_cast({:interested, peer_id, peer_pid}, state) do
- push_messages(state, peer_pid, nil, 10)
- new_peers = Map.put(state.peers, peer_id, peer_pid)
+ def handle_cast({:interested, peer_id}, state) do
+ push_messages(state, peer_id, nil, 10)
+ new_peers = MapSet.put(state.peers, peer_id)
{:noreply, %{ state | peers: new_peers }}
end
@@ -103,13 +104,14 @@ defmodule SApp.Chat do
- `{:info, start, list, rest}`: put some messages and informs of the
Merkle hash of the store of older messages.
"""
- def handle_cast({:msg, peer_id, peer_pid, msg}, state) do
+ def handle_cast({:msg, peer_id, msg}, state) do
case msg do
- {:get_manifest} -> send(state, peer_pid, {:manifest, state.manifest})
+ {:get_manifest} ->
+ Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}})
{:get, start} -> push_messages(peer_id, state, start, 20)
{:info, _start, list, rest} ->
if rest != nil and not GenServer.call(state.store, {:has, rest}) do
- send(state, peer_pid, {:get, rest})
+ Shard.Manager.send(peer_id, {state.id, {:get, rest}})
end
spawn_link(fn ->
Process.sleep 1000
@@ -118,22 +120,17 @@ defmodule SApp.Chat do
_ -> nil
end
- if Map.has_key?(state.peers, peer_id) do
+ if MapSet.member?(state.peers, peer_id) do
{:noreply, state}
else
- handle_cast({:interested, peer_id, peer_pid}, state)
+ handle_cast({:interested, peer_id}, state)
end
end
- defp send(state, to, msg) do
- GenServer.cast(to, {:send_msg, {state.id, msg}})
- end
-
-
defp push_messages(state, to, start, num) do
case GenServer.call(state.store, {:read, start, num}) do
{:ok, list, rest} ->
- send(state, to, {:info, start, list, rest})
+ Shard.Manager.send(to, {state.id, {:info, start, list, rest}})
_ -> nil
end
end