aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-10-11 18:02:03 +0200
committerAlex Auvolat <alex@adnab.me>2018-10-11 18:02:03 +0200
commitd164046678b5e10e23439a77fde75101a9e498d6 (patch)
tree5e0fe4b0a1f0a99d4c233a134c49baa993dcff6b
parent28c9cac2e92011e2f7e6f23e93e8a5d9480a0922 (diff)
downloadshard-d164046678b5e10e23439a77fde75101a9e498d6.tar.gz
shard-d164046678b5e10e23439a77fde75101a9e498d6.zip
Netgroup enforcement for pagestore
-rw-r--r--shard/lib/app/chat.ex14
-rw-r--r--shard/lib/app/identity.ex6
-rw-r--r--shard/lib/app/pagestore.ex131
3 files changed, 76 insertions, 75 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index f874e86..35ecdbf 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -74,8 +74,14 @@ defmodule SApp.Chat do
case Shard.Manager.register(id, manifest, self()) do
:ok ->
+ netgroup = case manifest do
+ %Manifest{channel: _channel} ->
+ %SNet.PubShardGroup{id: id}
+ %PrivChat.Manifest{pk_list: pk_list} ->
+ %SNet.PrivGroup{pk_list: pk_list}
+ end
Shard.Manager.dispatch_to(id, nil, self())
- {:ok, page_store} = SApp.PageStore.start_link(id, :page_store)
+ {:ok, page_store} = SApp.PageStore.start_link(id, :page_store, netgroup)
root = Shard.Manager.load_state id
root = cond do
root == nil -> nil
@@ -87,12 +93,6 @@ defmodule SApp.Chat do
mst = %MST{store: %SApp.PageStore{pid: page_store},
cmp: &msg_cmp/2,
root: root}
- netgroup = case manifest do
- %Manifest{channel: _channel} ->
- %SNet.PubShardGroup{id: id}
- %PrivChat.Manifest{pk_list: pk_list} ->
- %SNet.PrivGroup{pk_list: pk_list}
- end
SNet.Group.init_lookup(netgroup, self())
{:ok,
%{id: id,
diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex
index 390ef6d..06bc225 100644
--- a/shard/lib/app/identity.ex
+++ b/shard/lib/app/identity.ex
@@ -56,8 +56,10 @@ defmodule SApp.Identity do
id = SData.term_hash manifest
case Shard.Manager.find_proc id do
nil ->
- {:ok, pid} = Shard.Manifest.start manifest
- pid
+ case Shard.Manifest.start manifest do
+ {:ok, pid} -> pid
+ {:error, :redundant} -> find_proc(pk)
+ end
pid -> pid
end
end
diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex
index ad18eac..86b0726 100644
--- a/shard/lib/app/pagestore.ex
+++ b/shard/lib/app/pagestore.ex
@@ -25,15 +25,15 @@ defmodule SApp.PageStore do
@max_failures 4 # Maximum of peers that reply not_found before we abandon
defmodule State do
- defstruct [:shard_id, :path, :store, :reqs, :retries]
+ defstruct [:shard_id, :path, :netgroup, :store, :reqs, :retries]
end
- def start_link(shard_id, path) do
- GenServer.start_link(__MODULE__, [shard_id, path])
+ def start_link(shard_id, path, netgroup) do
+ GenServer.start_link(__MODULE__, [shard_id, path, netgroup])
end
- def init([shard_id, path]) do
+ def init([shard_id, path, netgroup]) do
Shard.Manager.dispatch_to(shard_id, path, self())
store_path = [Application.get_env(:shard, :data_path), "#{shard_id|>Base.encode16}.#{path}"] |> Path.join |> String.to_atom
@@ -41,7 +41,7 @@ defmodule SApp.PageStore do
Process.send_after(self(), :clean_cache, 1000)
- {:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}}
+ {:ok, %State{shard_id: shard_id, path: path, netgroup: netgroup, store: store, reqs: %{}, retries: %{}}}
end
@@ -123,70 +123,74 @@ defmodule SApp.PageStore do
{:noreply, state}
end
- def handle_cast({:msg, conn_pid, _auth, _shard_id, _path, msg}, state) do
- state = case msg do
- {:get, key} ->
- case :dets.lookup state.store, key do
- [{_, _, bin}] ->
- SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}})
- _ ->
- SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}})
- end
- state
- {:info, hash, bin} ->
- already_have_it = case :dets.lookup state.store, hash do
- [{_, _, _}] -> true
- _ -> false
- end
- if SData.bin_hash(bin) == hash and not already_have_it do
- reqs = case state.reqs[hash] do
- nil -> state.reqs
- pids ->
- for pid <- pids do
- GenServer.reply(pid, bin)
- end
- Map.delete(state.reqs, hash)
+ def handle_cast({:msg, conn_pid, auth, _shard_id, _path, msg}, state) do
+ if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
+ state
+ else
+ state = case msg do
+ {:get, key} ->
+ case :dets.lookup state.store, key do
+ [{_, _, bin}] ->
+ SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}})
+ _ ->
+ SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}})
end
- state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)}
- rec_why = store_put(state, hash, bin)
- if rec_why != nil do
- sub_why = case rec_why do
- {:cached, ttl} -> {:cached, ttl}
- _ -> {:req_by, hash}
+ state
+ {:info, hash, bin} ->
+ already_have_it = case :dets.lookup state.store, hash do
+ [{_, _, _}] -> true
+ _ -> false
+ end
+ if SData.bin_hash(bin) == hash and not already_have_it do
+ reqs = case state.reqs[hash] do
+ nil -> state.reqs
+ pids ->
+ for pid <- pids do
+ GenServer.reply(pid, bin)
+ end
+ Map.delete(state.reqs, hash)
end
- value = SData.term_unbin bin
- for dep <- SData.Page.refs value do
- if :dets.lookup state.store, dep == [] do
- init_rec_pull(state, dep, sub_why, [conn_pid])
+ state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)}
+ rec_why = store_put(state, hash, bin)
+ if rec_why != nil do
+ sub_why = case rec_why do
+ {:cached, ttl} -> {:cached, ttl}
+ _ -> {:req_by, hash}
+ end
+ value = SData.term_unbin bin
+ for dep <- SData.Page.refs value do
+ if :dets.lookup state.store, dep == [] do
+ init_rec_pull(state, dep, sub_why, [conn_pid])
+ end
end
end
- end
- state
- else
- state
- end
- {:not_found, key} ->
- if state.reqs[key] != nil do
- nretry = case state.retries[key] do
- nil -> 1
- n -> n+1
- end
- if nretry < @max_failures do
- ask_random_peers(state, key)
- %{state | retries: Map.put(state.retries, key, nretry)}
+ state
else
- for pid <- state.reqs[key] do
- GenServer.reply(pid, nil)
+ state
+ end
+ {:not_found, key} ->
+ if state.reqs[key] != nil do
+ nretry = case state.retries[key] do
+ nil -> 1
+ n -> n+1
end
- state = %{state | reqs: Map.delete(state.reqs, key)}
- state = %{state | retries: Map.delete(state.retries, key)}
+ if nretry < @max_failures do
+ ask_random_peers(state, key)
+ %{state | retries: Map.put(state.retries, key, nretry)}
+ else
+ for pid <- state.reqs[key] do
+ GenServer.reply(pid, nil)
+ end
+ state = %{state | reqs: Map.delete(state.reqs, key)}
+ state = %{state | retries: Map.delete(state.retries, key)}
+ state
+ end
+ else
state
end
- else
- state
- end
+ end
+ {:noreply, state}
end
- {:noreply, state}
end
def handle_cast({:set_roots, roots}, state) do
@@ -247,12 +251,7 @@ defmodule SApp.PageStore do
end
def ask_random_peers(state, key) do
- peers = Shard.Manager.get_shard_peers(state.shard_id)
- |> Enum.shuffle
- |> Enum.take(3)
- for peer <- peers do
- SNet.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
- end
+ SNet.Group.broadcast(state.netgroup, {state.shard_id, state.path, {:get, key}}, 3)
end
defimpl SData.PageStore do