aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/app/pagestore.ex
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/app/pagestore.ex')
-rw-r--r--shard/lib/app/pagestore.ex131
1 files changed, 65 insertions, 66 deletions
diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex
index ad18eac..86b0726 100644
--- a/shard/lib/app/pagestore.ex
+++ b/shard/lib/app/pagestore.ex
@@ -25,15 +25,15 @@ defmodule SApp.PageStore do
@max_failures 4 # Maximum of peers that reply not_found before we abandon
defmodule State do
- defstruct [:shard_id, :path, :store, :reqs, :retries]
+ defstruct [:shard_id, :path, :netgroup, :store, :reqs, :retries]
end
- def start_link(shard_id, path) do
- GenServer.start_link(__MODULE__, [shard_id, path])
+ def start_link(shard_id, path, netgroup) do
+ GenServer.start_link(__MODULE__, [shard_id, path, netgroup])
end
- def init([shard_id, path]) do
+ def init([shard_id, path, netgroup]) do
Shard.Manager.dispatch_to(shard_id, path, self())
store_path = [Application.get_env(:shard, :data_path), "#{shard_id|>Base.encode16}.#{path}"] |> Path.join |> String.to_atom
@@ -41,7 +41,7 @@ defmodule SApp.PageStore do
Process.send_after(self(), :clean_cache, 1000)
- {:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}}
+ {:ok, %State{shard_id: shard_id, path: path, netgroup: netgroup, store: store, reqs: %{}, retries: %{}}}
end
@@ -123,70 +123,74 @@ defmodule SApp.PageStore do
{:noreply, state}
end
- def handle_cast({:msg, conn_pid, _auth, _shard_id, _path, msg}, state) do
- state = case msg do
- {:get, key} ->
- case :dets.lookup state.store, key do
- [{_, _, bin}] ->
- SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}})
- _ ->
- SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}})
- end
- state
- {:info, hash, bin} ->
- already_have_it = case :dets.lookup state.store, hash do
- [{_, _, _}] -> true
- _ -> false
- end
- if SData.bin_hash(bin) == hash and not already_have_it do
- reqs = case state.reqs[hash] do
- nil -> state.reqs
- pids ->
- for pid <- pids do
- GenServer.reply(pid, bin)
- end
- Map.delete(state.reqs, hash)
+ def handle_cast({:msg, conn_pid, auth, _shard_id, _path, msg}, state) do
+ if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
+ state
+ else
+ state = case msg do
+ {:get, key} ->
+ case :dets.lookup state.store, key do
+ [{_, _, bin}] ->
+ SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}})
+ _ ->
+ SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}})
end
- state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)}
- rec_why = store_put(state, hash, bin)
- if rec_why != nil do
- sub_why = case rec_why do
- {:cached, ttl} -> {:cached, ttl}
- _ -> {:req_by, hash}
+ state
+ {:info, hash, bin} ->
+ already_have_it = case :dets.lookup state.store, hash do
+ [{_, _, _}] -> true
+ _ -> false
+ end
+ if SData.bin_hash(bin) == hash and not already_have_it do
+ reqs = case state.reqs[hash] do
+ nil -> state.reqs
+ pids ->
+ for pid <- pids do
+ GenServer.reply(pid, bin)
+ end
+ Map.delete(state.reqs, hash)
end
- value = SData.term_unbin bin
- for dep <- SData.Page.refs value do
- if :dets.lookup state.store, dep == [] do
- init_rec_pull(state, dep, sub_why, [conn_pid])
+ state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)}
+ rec_why = store_put(state, hash, bin)
+ if rec_why != nil do
+ sub_why = case rec_why do
+ {:cached, ttl} -> {:cached, ttl}
+ _ -> {:req_by, hash}
+ end
+ value = SData.term_unbin bin
+ for dep <- SData.Page.refs value do
+ if :dets.lookup state.store, dep == [] do
+ init_rec_pull(state, dep, sub_why, [conn_pid])
+ end
end
end
- end
- state
- else
- state
- end
- {:not_found, key} ->
- if state.reqs[key] != nil do
- nretry = case state.retries[key] do
- nil -> 1
- n -> n+1
- end
- if nretry < @max_failures do
- ask_random_peers(state, key)
- %{state | retries: Map.put(state.retries, key, nretry)}
+ state
else
- for pid <- state.reqs[key] do
- GenServer.reply(pid, nil)
+ state
+ end
+ {:not_found, key} ->
+ if state.reqs[key] != nil do
+ nretry = case state.retries[key] do
+ nil -> 1
+ n -> n+1
end
- state = %{state | reqs: Map.delete(state.reqs, key)}
- state = %{state | retries: Map.delete(state.retries, key)}
+ if nretry < @max_failures do
+ ask_random_peers(state, key)
+ %{state | retries: Map.put(state.retries, key, nretry)}
+ else
+ for pid <- state.reqs[key] do
+ GenServer.reply(pid, nil)
+ end
+ state = %{state | reqs: Map.delete(state.reqs, key)}
+ state = %{state | retries: Map.delete(state.retries, key)}
+ state
+ end
+ else
state
end
- else
- state
- end
+ end
+ {:noreply, state}
end
- {:noreply, state}
end
def handle_cast({:set_roots, roots}, state) do
@@ -247,12 +251,7 @@ defmodule SApp.PageStore do
end
def ask_random_peers(state, key) do
- peers = Shard.Manager.get_shard_peers(state.shard_id)
- |> Enum.shuffle
- |> Enum.take(3)
- for peer <- peers do
- SNet.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
- end
+ SNet.Group.broadcast(state.netgroup, {state.shard_id, state.path, {:get, key}}, 3)
end
defimpl SData.PageStore do