diff options
author | Alex Auvolat <alex@adnab.me> | 2018-10-11 18:02:03 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-10-11 18:02:03 +0200 |
commit | d164046678b5e10e23439a77fde75101a9e498d6 (patch) | |
tree | 5e0fe4b0a1f0a99d4c233a134c49baa993dcff6b /shard/lib | |
parent | 28c9cac2e92011e2f7e6f23e93e8a5d9480a0922 (diff) | |
download | shard-d164046678b5e10e23439a77fde75101a9e498d6.tar.gz shard-d164046678b5e10e23439a77fde75101a9e498d6.zip |
Netgroup enforcement for pagestore
Diffstat (limited to 'shard/lib')
-rw-r--r-- | shard/lib/app/chat.ex | 14 | ||||
-rw-r--r-- | shard/lib/app/identity.ex | 6 | ||||
-rw-r--r-- | shard/lib/app/pagestore.ex | 131 |
3 files changed, 76 insertions, 75 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index f874e86..35ecdbf 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -74,8 +74,14 @@ defmodule SApp.Chat do case Shard.Manager.register(id, manifest, self()) do :ok -> + netgroup = case manifest do + %Manifest{channel: _channel} -> + %SNet.PubShardGroup{id: id} + %PrivChat.Manifest{pk_list: pk_list} -> + %SNet.PrivGroup{pk_list: pk_list} + end Shard.Manager.dispatch_to(id, nil, self()) - {:ok, page_store} = SApp.PageStore.start_link(id, :page_store) + {:ok, page_store} = SApp.PageStore.start_link(id, :page_store, netgroup) root = Shard.Manager.load_state id root = cond do root == nil -> nil @@ -87,12 +93,6 @@ defmodule SApp.Chat do mst = %MST{store: %SApp.PageStore{pid: page_store}, cmp: &msg_cmp/2, root: root} - netgroup = case manifest do - %Manifest{channel: _channel} -> - %SNet.PubShardGroup{id: id} - %PrivChat.Manifest{pk_list: pk_list} -> - %SNet.PrivGroup{pk_list: pk_list} - end SNet.Group.init_lookup(netgroup, self()) {:ok, %{id: id, diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex index 390ef6d..06bc225 100644 --- a/shard/lib/app/identity.ex +++ b/shard/lib/app/identity.ex @@ -56,8 +56,10 @@ defmodule SApp.Identity do id = SData.term_hash manifest case Shard.Manager.find_proc id do nil -> - {:ok, pid} = Shard.Manifest.start manifest - pid + case Shard.Manifest.start manifest do + {:ok, pid} -> pid + {:error, :redundant} -> find_proc(pk) + end pid -> pid end end diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex index ad18eac..86b0726 100644 --- a/shard/lib/app/pagestore.ex +++ b/shard/lib/app/pagestore.ex @@ -25,15 +25,15 @@ defmodule SApp.PageStore do @max_failures 4 # Maximum of peers that reply not_found before we abandon defmodule State do - defstruct [:shard_id, :path, :store, :reqs, :retries] + defstruct [:shard_id, :path, :netgroup, :store, :reqs, :retries] end - def start_link(shard_id, path) do - GenServer.start_link(__MODULE__, [shard_id, path]) + def start_link(shard_id, path, netgroup) do + GenServer.start_link(__MODULE__, [shard_id, path, netgroup]) end - def init([shard_id, path]) do + def init([shard_id, path, netgroup]) do Shard.Manager.dispatch_to(shard_id, path, self()) store_path = [Application.get_env(:shard, :data_path), "#{shard_id|>Base.encode16}.#{path}"] |> Path.join |> String.to_atom @@ -41,7 +41,7 @@ defmodule SApp.PageStore do Process.send_after(self(), :clean_cache, 1000) - {:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}} + {:ok, %State{shard_id: shard_id, path: path, netgroup: netgroup, store: store, reqs: %{}, retries: %{}}} end @@ -123,70 +123,74 @@ defmodule SApp.PageStore do {:noreply, state} end - def handle_cast({:msg, conn_pid, _auth, _shard_id, _path, msg}, state) do - state = case msg do - {:get, key} -> - case :dets.lookup state.store, key do - [{_, _, bin}] -> - SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}}) - _ -> - SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}}) - end - state - {:info, hash, bin} -> - already_have_it = case :dets.lookup state.store, hash do - [{_, _, _}] -> true - _ -> false - end - if SData.bin_hash(bin) == hash and not already_have_it do - reqs = case state.reqs[hash] do - nil -> state.reqs - pids -> - for pid <- pids do - GenServer.reply(pid, bin) - end - Map.delete(state.reqs, hash) + def handle_cast({:msg, conn_pid, auth, _shard_id, _path, msg}, state) do + if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do + state + else + state = case msg do + {:get, key} -> + case :dets.lookup state.store, key do + [{_, _, bin}] -> + SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:info, key, bin}}) + _ -> + SNet.Manager.send_pid(conn_pid, {state.shard_id, state.path, {:not_found, key}}) end - state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)} - rec_why = store_put(state, hash, bin) - if rec_why != nil do - sub_why = case rec_why do - {:cached, ttl} -> {:cached, ttl} - _ -> {:req_by, hash} + state + {:info, hash, bin} -> + already_have_it = case :dets.lookup state.store, hash do + [{_, _, _}] -> true + _ -> false + end + if SData.bin_hash(bin) == hash and not already_have_it do + reqs = case state.reqs[hash] do + nil -> state.reqs + pids -> + for pid <- pids do + GenServer.reply(pid, bin) + end + Map.delete(state.reqs, hash) end - value = SData.term_unbin bin - for dep <- SData.Page.refs value do - if :dets.lookup state.store, dep == [] do - init_rec_pull(state, dep, sub_why, [conn_pid]) + state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)} + rec_why = store_put(state, hash, bin) + if rec_why != nil do + sub_why = case rec_why do + {:cached, ttl} -> {:cached, ttl} + _ -> {:req_by, hash} + end + value = SData.term_unbin bin + for dep <- SData.Page.refs value do + if :dets.lookup state.store, dep == [] do + init_rec_pull(state, dep, sub_why, [conn_pid]) + end end end - end - state - else - state - end - {:not_found, key} -> - if state.reqs[key] != nil do - nretry = case state.retries[key] do - nil -> 1 - n -> n+1 - end - if nretry < @max_failures do - ask_random_peers(state, key) - %{state | retries: Map.put(state.retries, key, nretry)} + state else - for pid <- state.reqs[key] do - GenServer.reply(pid, nil) + state + end + {:not_found, key} -> + if state.reqs[key] != nil do + nretry = case state.retries[key] do + nil -> 1 + n -> n+1 end - state = %{state | reqs: Map.delete(state.reqs, key)} - state = %{state | retries: Map.delete(state.retries, key)} + if nretry < @max_failures do + ask_random_peers(state, key) + %{state | retries: Map.put(state.retries, key, nretry)} + else + for pid <- state.reqs[key] do + GenServer.reply(pid, nil) + end + state = %{state | reqs: Map.delete(state.reqs, key)} + state = %{state | retries: Map.delete(state.retries, key)} + state + end + else state end - else - state - end + end + {:noreply, state} end - {:noreply, state} end def handle_cast({:set_roots, roots}, state) do @@ -247,12 +251,7 @@ defmodule SApp.PageStore do end def ask_random_peers(state, key) do - peers = Shard.Manager.get_shard_peers(state.shard_id) - |> Enum.shuffle - |> Enum.take(3) - for peer <- peers do - SNet.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) - end + SNet.Group.broadcast(state.netgroup, {state.shard_id, state.path, {:get, key}}, 3) end defimpl SData.PageStore do |