aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--shard/lib/app/chat.ex6
-rw-r--r--shard/lib/app/pagestore.ex50
-rw-r--r--shard/lib/manager.ex33
3 files changed, 63 insertions, 26 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index fa62c9e..db2cb64 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -52,7 +52,8 @@ defmodule SApp.Chat do
Shard.Manager.dispatch_to(id, nil, self())
{:ok, page_store} = SApp.PageStore.start_link(id, :page_store)
mst = %MST{store: %SApp.PageStore{pid: page_store},
- cmp: &msg_cmp/2}
+ cmp: &msg_cmp/2,
+ root: Shard.Manager.load_state(id)}
GenServer.cast(self(), :init_pull)
{:ok,
%{channel: channel,
@@ -104,6 +105,7 @@ defmodule SApp.Chat do
prev_root = state.mst.root
mst = MST.insert(state.mst, msgitem)
state = %{state | mst: mst}
+ Shard.Manager.save_state(state.id, mst.root)
for pid <- state.subs do
if Process.alive?(pid) do
@@ -160,6 +162,7 @@ defmodule SApp.Chat do
if mst2.root == new_root do
# This was the only message missing, we are happy!
state = %{state | mst: mst2}
+ Shard.Manager.save_state(state.id, mst2.root)
GenServer.cast(state.page_store, {:set_roots, [mst2.root]})
msg_callback(state, msgitem)
state
@@ -200,6 +203,7 @@ defmodule SApp.Chat do
end
GenServer.cast(state.page_store, {:set_roots, [mst.root]})
+ Shard.Manager.save_state(state.id, mst.root)
%{state | mst: mst}
end
diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex
index 9944544..7962084 100644
--- a/shard/lib/app/pagestore.ex
+++ b/shard/lib/app/pagestore.ex
@@ -36,17 +36,17 @@ defmodule SApp.PageStore do
def init([shard_id, path]) do
Shard.Manager.dispatch_to(shard_id, path, self())
- store_path = String.to_atom "#{Base.encode16 shard_id}/#{Atom.to_string path}"
- store = :ets.new store_path, [:set, :protected]
+ store_path = [Application.get_env(:shard, :data_path), "#{shard_id|>Base.encode16}.#{path}"] |> Path.join |> String.to_atom
+ {:ok, store} = :dets.open_file store_path, [type: :set]
- Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
+ Process.send_after(self(), :clean_cache, 1000)
{:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}}
end
def handle_call({:get, key, prefer_ask}, from, state) do
- case :ets.lookup state.store, key do
+ case :dets.lookup state.store, key do
[{_, _, bin}] ->
{:reply, bin, state}
[{_, _}] ->
@@ -77,12 +77,12 @@ defmodule SApp.PageStore do
end
defp store_put(state, hash, bin) do
- case :ets.lookup state.store, hash do
+ case :dets.lookup state.store, hash do
[] ->
- :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin}
+ :dets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin}
nil
[{_, why}] ->
- :ets.insert state.store, {hash, why, bin}
+ :dets.insert state.store, {hash, why, bin}
why
[{_, _, _}] ->
nil
@@ -91,18 +91,18 @@ defmodule SApp.PageStore do
defp init_rec_pull(state, key, why, prefer_ask) do
case prefer_ask do
- [] ->
- ask_random_peers(state, key)
- _ ->
+ [_|_] ->
for peer <- prefer_ask do
Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
end
+ _ ->
+ ask_random_peers(state, key)
end
- :ets.insert state.store, {key, why}
+ :dets.insert state.store, {key, why}
end
def handle_cast({:rec_pull, hash, ask_to}, state) do
- if :ets.lookup state.store, hash == [] do
+ if :dets.lookup state.store, hash == [] do
why = {:cached, System.os_time(:seconds) + @cache_ttl}
init_rec_pull(state, hash, why, ask_to)
end
@@ -112,7 +112,7 @@ defmodule SApp.PageStore do
def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
state = case msg do
{:get, key} ->
- case :ets.lookup state.store, key do
+ case :dets.lookup state.store, key do
[{_, _, bin}] ->
Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, bin}})
_ ->
@@ -120,7 +120,7 @@ defmodule SApp.PageStore do
end
state
{:info, hash, bin} ->
- already_have_it = case :ets.lookup state.store, hash do
+ already_have_it = case :dets.lookup state.store, hash do
[{_, _, _}] -> true
_ -> false
end
@@ -142,7 +142,7 @@ defmodule SApp.PageStore do
end
value = SData.term_unbin bin
for dep <- SData.Page.refs value do
- if :ets.lookup state.store, dep == [] do
+ if :dets.lookup state.store, dep == [] do
init_rec_pull(state, dep, sub_why, [peer_id])
end
end
@@ -179,27 +179,27 @@ defmodule SApp.PageStore do
cached_why = {:cached, System.os_time(:seconds) + @cache_ttl}
# Set old roots and their deps as cached
- for ent <- :ets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] },
+ for ent <- :dets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] },
{ {:"$1", :root}, [], [:"$$"] },
{ {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] },
{ {:"$1", {:req_by, :_}}, [], [:"$$"] }]
do
case ent do
[id, bin] ->
- :ets.insert state.store, {id, cached_why, bin}
+ :dets.insert state.store, {id, cached_why, bin}
[id] ->
- :ets.insert state.store, {id, cached_why}
+ :dets.insert state.store, {id, cached_why}
end
end
# Set new roots as roots
for root <- roots do
- case :ets.lookup state.store, root do
+ case :dets.lookup state.store, root do
[{^root, _, bin}] ->
- :ets.insert state.store, {root, :root, bin}
+ :dets.insert state.store, {root, :root, bin}
rec_set_dep(state, root, SData.term_unbin bin)
[{^root, _}] ->
- :ets.insert state.store, {root, :root}
+ :dets.insert state.store, {root, :root}
[] ->
init_rec_pull state, root, :root, []
end
@@ -209,12 +209,12 @@ defmodule SApp.PageStore do
defp rec_set_dep(state, hash, val0) do
for dep <- SData.Page.refs val0 do
- case :ets.lookup state.store, dep do
+ case :dets.lookup state.store, dep do
[{^dep, _, bin}] ->
- :ets.insert state.store, {dep, {:req_by, hash}, bin}
+ :dets.insert state.store, {dep, {:req_by, hash}, bin}
rec_set_dep(state, dep, SData.term_unbin bin)
[{^dep, _}] ->
- :ets.insert state.store, {dep, {:req_by, hash}}
+ :dets.insert state.store, {dep, {:req_by, hash}}
[] ->
init_rec_pull state, dep, {:req_by, hash}, []
end
@@ -226,7 +226,7 @@ defmodule SApp.PageStore do
cache_cleanup = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [true]},
{{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [true]} ]
- :ets.select_delete(state.store, cache_cleanup)
+ :dets.select_delete(state.store, cache_cleanup)
Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
{:noreply, state}
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index 7aa3758..57f2371 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -17,6 +17,11 @@ defmodule Shard.Manager do
List of
{ id, manifest, pid | nil }
+ - :shard_state
+
+ List of
+ { id, state }
+
- :shard_procs
List of
@@ -43,6 +48,7 @@ defmodule Shard.Manager do
@peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom
@shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom
+ @shard_state [Application.get_env(:shard, :data_path), "shard_state"] |> Path.join |> String.to_atom
@shard_peer_db [Application.get_env(:shard, :data_path), "shard_peer_db"] |> Path.join |> String.to_atom
def start_link(my_port) do
@@ -63,6 +69,7 @@ defmodule Shard.Manager do
spawn fn -> Shard.Manifest.start manifest end
end
+ :dets.open_file(@shard_state, [type: :set])
:dets.open_file(@shard_peer_db, [type: :bag])
:ets.new(:shard_procs, [:set, :protected, :named_table])
@@ -260,15 +267,41 @@ defmodule Shard.Manager do
GenServer.cast(__MODULE__, {:dispatch_to, shard_id, path, pid})
end
+ @doc"""
+ Return the list of all shards.
+ """
def list_shards() do
for [x] <- :dets.match(@shard_db, :"$1"), do: x
end
+ @doc"""
+ Return the list of all peers
+ """
def list_peers() do
for [x] <- :dets.match(@peer_db, :"$1"), do: x
end
+ @doc"""
+ Return the list of all peer IDs that are interested in a certain shard
+ """
def get_shard_peers(shard_id) do
for [x] <- :dets.match(@shard_peer_db, {shard_id, :"$1"}), do: x
end
+
+ @doc"""
+ Return the saved state value for a shard
+ """
+ def load_state(shard_id) do
+ case :dets.lookup(@shard_state, shard_id) do
+ [{^shard_id, state}] -> state
+ _ -> nil
+ end
+ end
+
+ @doc"""
+ Save a state value for a shard
+ """
+ def save_state(shard_id, state) do
+ :dets.insert(@shard_state, {shard_id, state})
+ end
end