From 1df3aa74a6870f276bead1ed5650f0d86355ce09 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 26 Sep 2018 10:33:23 +0200 Subject: Persistency --- shard/lib/app/chat.ex | 6 +++++- shard/lib/app/pagestore.ex | 50 +++++++++++++++++++++++----------------------- shard/lib/manager.ex | 33 ++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 26 deletions(-) (limited to 'shard') diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index fa62c9e..db2cb64 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -52,7 +52,8 @@ defmodule SApp.Chat do Shard.Manager.dispatch_to(id, nil, self()) {:ok, page_store} = SApp.PageStore.start_link(id, :page_store) mst = %MST{store: %SApp.PageStore{pid: page_store}, - cmp: &msg_cmp/2} + cmp: &msg_cmp/2, + root: Shard.Manager.load_state(id)} GenServer.cast(self(), :init_pull) {:ok, %{channel: channel, @@ -104,6 +105,7 @@ defmodule SApp.Chat do prev_root = state.mst.root mst = MST.insert(state.mst, msgitem) state = %{state | mst: mst} + Shard.Manager.save_state(state.id, mst.root) for pid <- state.subs do if Process.alive?(pid) do @@ -160,6 +162,7 @@ defmodule SApp.Chat do if mst2.root == new_root do # This was the only message missing, we are happy! state = %{state | mst: mst2} + Shard.Manager.save_state(state.id, mst2.root) GenServer.cast(state.page_store, {:set_roots, [mst2.root]}) msg_callback(state, msgitem) state @@ -200,6 +203,7 @@ defmodule SApp.Chat do end GenServer.cast(state.page_store, {:set_roots, [mst.root]}) + Shard.Manager.save_state(state.id, mst.root) %{state | mst: mst} end diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex index 9944544..7962084 100644 --- a/shard/lib/app/pagestore.ex +++ b/shard/lib/app/pagestore.ex @@ -36,17 +36,17 @@ defmodule SApp.PageStore do def init([shard_id, path]) do Shard.Manager.dispatch_to(shard_id, path, self()) - store_path = String.to_atom "#{Base.encode16 shard_id}/#{Atom.to_string path}" - store = :ets.new store_path, [:set, :protected] + store_path = [Application.get_env(:shard, :data_path), "#{shard_id|>Base.encode16}.#{path}"] |> Path.join |> String.to_atom + {:ok, store} = :dets.open_file store_path, [type: :set] - Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) + Process.send_after(self(), :clean_cache, 1000) {:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}} end def handle_call({:get, key, prefer_ask}, from, state) do - case :ets.lookup state.store, key do + case :dets.lookup state.store, key do [{_, _, bin}] -> {:reply, bin, state} [{_, _}] -> @@ -77,12 +77,12 @@ defmodule SApp.PageStore do end defp store_put(state, hash, bin) do - case :ets.lookup state.store, hash do + case :dets.lookup state.store, hash do [] -> - :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin} + :dets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin} nil [{_, why}] -> - :ets.insert state.store, {hash, why, bin} + :dets.insert state.store, {hash, why, bin} why [{_, _, _}] -> nil @@ -91,18 +91,18 @@ defmodule SApp.PageStore do defp init_rec_pull(state, key, why, prefer_ask) do case prefer_ask do - [] -> - ask_random_peers(state, key) - _ -> + [_|_] -> for peer <- prefer_ask do Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) end + _ -> + ask_random_peers(state, key) end - :ets.insert state.store, {key, why} + :dets.insert state.store, {key, why} end def handle_cast({:rec_pull, hash, ask_to}, state) do - if :ets.lookup state.store, hash == [] do + if :dets.lookup state.store, hash == [] do why = {:cached, System.os_time(:seconds) + @cache_ttl} init_rec_pull(state, hash, why, ask_to) end @@ -112,7 +112,7 @@ defmodule SApp.PageStore do def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do state = case msg do {:get, key} -> - case :ets.lookup state.store, key do + case :dets.lookup state.store, key do [{_, _, bin}] -> Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, bin}}) _ -> @@ -120,7 +120,7 @@ defmodule SApp.PageStore do end state {:info, hash, bin} -> - already_have_it = case :ets.lookup state.store, hash do + already_have_it = case :dets.lookup state.store, hash do [{_, _, _}] -> true _ -> false end @@ -142,7 +142,7 @@ defmodule SApp.PageStore do end value = SData.term_unbin bin for dep <- SData.Page.refs value do - if :ets.lookup state.store, dep == [] do + if :dets.lookup state.store, dep == [] do init_rec_pull(state, dep, sub_why, [peer_id]) end end @@ -179,27 +179,27 @@ defmodule SApp.PageStore do cached_why = {:cached, System.os_time(:seconds) + @cache_ttl} # Set old roots and their deps as cached - for ent <- :ets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] }, + for ent <- :dets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] }, { {:"$1", :root}, [], [:"$$"] }, { {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] }, { {:"$1", {:req_by, :_}}, [], [:"$$"] }] do case ent do [id, bin] -> - :ets.insert state.store, {id, cached_why, bin} + :dets.insert state.store, {id, cached_why, bin} [id] -> - :ets.insert state.store, {id, cached_why} + :dets.insert state.store, {id, cached_why} end end # Set new roots as roots for root <- roots do - case :ets.lookup state.store, root do + case :dets.lookup state.store, root do [{^root, _, bin}] -> - :ets.insert state.store, {root, :root, bin} + :dets.insert state.store, {root, :root, bin} rec_set_dep(state, root, SData.term_unbin bin) [{^root, _}] -> - :ets.insert state.store, {root, :root} + :dets.insert state.store, {root, :root} [] -> init_rec_pull state, root, :root, [] end @@ -209,12 +209,12 @@ defmodule SApp.PageStore do defp rec_set_dep(state, hash, val0) do for dep <- SData.Page.refs val0 do - case :ets.lookup state.store, dep do + case :dets.lookup state.store, dep do [{^dep, _, bin}] -> - :ets.insert state.store, {dep, {:req_by, hash}, bin} + :dets.insert state.store, {dep, {:req_by, hash}, bin} rec_set_dep(state, dep, SData.term_unbin bin) [{^dep, _}] -> - :ets.insert state.store, {dep, {:req_by, hash}} + :dets.insert state.store, {dep, {:req_by, hash}} [] -> init_rec_pull state, dep, {:req_by, hash}, [] end @@ -226,7 +226,7 @@ defmodule SApp.PageStore do cache_cleanup = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [true]}, {{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [true]} ] - :ets.select_delete(state.store, cache_cleanup) + :dets.select_delete(state.store, cache_cleanup) Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) {:noreply, state} diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index 7aa3758..57f2371 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -17,6 +17,11 @@ defmodule Shard.Manager do List of { id, manifest, pid | nil } + - :shard_state + + List of + { id, state } + - :shard_procs List of @@ -43,6 +48,7 @@ defmodule Shard.Manager do @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom + @shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom @shard_peer_db [Application.get_env(:shard, :data_path), "shard_peer_db"] |> Path.join |> String.to_atom def start_link(my_port) do @@ -63,6 +69,7 @@ defmodule Shard.Manager do spawn fn -> Shard.Manifest.start manifest end end + :dets.open_file(@shard_state, [type: :set]) :dets.open_file(@shard_peer_db, [type: :bag]) :ets.new(:shard_procs, [:set, :protected, :named_table]) @@ -260,15 +267,41 @@ defmodule Shard.Manager do GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid}) end + @doc""" + Return the list of all shards. + """ def list_shards() do for [x] <- :dets.match(@shard_db, :"$1"), do: x end + @doc""" + Return the list of all peers + """ def list_peers() do for [x] <- :dets.match(@peer_db, :"$1"), do: x end + @doc""" + Return the list of all peer IDs that are interested in a certain shard + """ def get_shard_peers(shard_id) do for [x] <- :dets.match(@shard_peer_db, {shard_id, :"$1"}), do: x end + + @doc""" + Return the saved state value for a shard + """ + def load_state(shard_id) do + case :dets.lookup(@shard_state, shard_id) do + [{^shard_id, state}] -> state + _ -> nil + end + end + + @doc""" + Save a state value for a shard + """ + def save_state(shard_id, state) do + :dets.insert(@shard_state, {shard_id, state}) + end end -- cgit v1.2.3