aboutsummaryrefslogtreecommitdiff
path: root/shard/lib
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-09-25 19:57:35 +0200
committerAlex Auvolat <alex@adnab.me>2018-09-25 19:57:35 +0200
commit78e5caa664b860189ea86f95fe68ad5e6705897b (patch)
treec2d932af6ed1451b6c4bc11ebbe37ddc7ac8f90b /shard/lib
parentbfdcc107028f64d3d4a6e38ae9732d33182502d6 (diff)
downloadshard-78e5caa664b860189ea86f95fe68ad5e6705897b.tar.gz
shard-78e5caa664b860189ea86f95fe68ad5e6705897b.zip
s/block/page, change semantics to use more binaries, fixes
Diffstat (limited to 'shard/lib')
-rw-r--r--shard/lib/app/chat.ex10
-rw-r--r--shard/lib/app/pagestore.ex (renamed from shard/lib/app/blockstore.ex)116
-rw-r--r--shard/lib/data/data.ex12
3 files changed, 80 insertions, 58 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index 4752dc6..fa62c9e 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -50,15 +50,15 @@ defmodule SApp.Chat do
case Shard.Manager.register(id, manifest, self()) do
:ok ->
Shard.Manager.dispatch_to(id, nil, self())
- {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store)
- mst = %MST{store: %SApp.BlockStore{pid: block_store},
+ {:ok, page_store} = SApp.PageStore.start_link(id, :page_store)
+ mst = %MST{store: %SApp.PageStore{pid: page_store},
cmp: &msg_cmp/2}
GenServer.cast(self(), :init_pull)
{:ok,
%{channel: channel,
id: id,
manifest: manifest,
- block_store: block_store,
+ page_store: page_store,
mst: mst,
subs: MapSet.new,
}
@@ -160,7 +160,7 @@ defmodule SApp.Chat do
if mst2.root == new_root do
# This was the only message missing, we are happy!
state = %{state | mst: mst2}
- GenServer.cast(state.block_store, {:set_roots, [mst2.root]})
+ GenServer.cast(state.page_store, {:set_roots, [mst2.root]})
msg_callback(state, msgitem)
state
else
@@ -199,7 +199,7 @@ defmodule SApp.Chat do
end
end
- GenServer.cast(state.block_store, {:set_roots, [mst.root]})
+ GenServer.cast(state.page_store, {:set_roots, [mst.root]})
%{state | mst: mst}
end
diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/pagestore.ex
index 077235a..5f2ba54 100644
--- a/shard/lib/app/blockstore.ex
+++ b/shard/lib/app/pagestore.ex
@@ -1,17 +1,17 @@
-defmodule SApp.BlockStore do
+defmodule SApp.PageStore do
@moduledoc """
- A module that implements a content-adressable storage (blocks, or pages,
+ A module that implements a content-adressable storage (pages,
identified by the hash of their contents).
This is not a shard, it is a side process that a shard may use to store its data.
Uses an ETS table of:
- { block_id, why_have_it } -- waiting for data
- { block_id, why_have_it, data } -- once we have the data
+ { page_id, why_have_it } -- waiting for data
+ { page_id, why_have_it, data } -- once we have the data
why_have_it := :root
- | {:req_by, some_other_block_id}
+ | {:req_by, some_other_page_id}
| {:cached, expiry_date}
"""
@@ -22,6 +22,7 @@ defmodule SApp.BlockStore do
@cache_ttl 600 # Ten minutes
@clean_cache_every 60 # One minute
+ @max_failures 4 # Maximum of peers that reply not_found before we abandon
defmodule State do
defstruct [:shard_id, :path, :store, :reqs, :retries]
@@ -46,8 +47,8 @@ defmodule SApp.BlockStore do
def handle_call({:get, key, prefer_ask}, from, state) do
case :ets.lookup state.store, key do
- [{_, _, v}] ->
- {:reply, v, state}
+ [{_, _, bin}] ->
+ {:reply, bin, state}
[{_, _}] ->
state = add_request(state, key, from)
{:noreply, state}
@@ -59,9 +60,9 @@ defmodule SApp.BlockStore do
end
end
- def handle_call({:put, val}, _from, state) do
- hash = SData.term_hash val
- store_put(state, hash, val)
+ def handle_call({:put, bin}, _from, state) do
+ hash = SData.bin_hash bin
+ store_put(state, hash, bin)
{:reply, hash, state}
end
@@ -75,13 +76,13 @@ defmodule SApp.BlockStore do
put_in(state.reqs[key], reqs_key)
end
- defp store_put(state, hash, val) do
+ defp store_put(state, hash, bin) do
case :ets.lookup state.store, hash do
[] ->
- :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, val}
+ :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin}
nil
[{_, why}] ->
- :ets.insert state.store, {hash, why, val}
+ :ets.insert state.store, {hash, why, bin}
why
[{_, _, _}] ->
nil
@@ -90,12 +91,12 @@ defmodule SApp.BlockStore do
defp init_rec_pull(state, key, why, prefer_ask) do
case prefer_ask do
- [_ | _] ->
+ [] ->
+ ask_random_peers(state, key)
+ _ ->
for peer <- prefer_ask do
Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
end
- _ ->
- ask_random_peers(state, key)
end
:ets.insert state.store, {key, why}
end
@@ -112,44 +113,51 @@ defmodule SApp.BlockStore do
state = case msg do
{:get, key} ->
case :ets.lookup state.store, key do
- [{_, _, v}] ->
- Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
+ [{_, _, bin}] ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, bin}})
_ ->
Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
end
state
- {:info, hash, value} ->
- if SData.term_hash value == hash do
+ {:info, hash, bin} ->
+ already_have_it = case :ets.lookup state.store, hash do
+ [{_, _, _}] -> true
+ _ -> false
+ end
+ if SData.bin_hash bin == hash and not already_have_it do
reqs = case state.reqs[hash] do
nil -> state.reqs
pids ->
for pid <- pids do
- GenServer.reply(pid, value)
+ GenServer.reply(pid, bin)
end
Map.delete(state.reqs, hash)
end
- state = %{state | retries: Map.delete(state.retries, hash)}
- rec_why = store_put state, hash, value
+ state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)}
+ rec_why = store_put(state, hash, bin)
if rec_why != nil do
sub_why = case rec_why do
{:cached, ttl} -> {:cached, ttl}
_ -> {:req_by, hash}
end
+ value = SData.term_unbin bin
for dep <- SData.Page.refs value do
- init_rec_pull(state, dep, sub_why, [peer_id])
+ if :ets.lookup state.store, dep == [] do
+ init_rec_pull(state, dep, sub_why, [peer_id])
+ end
end
end
- %{state | reqs: reqs}
+ state
else
state
end
{:not_found, key} ->
- if state.reqs[key] != nil and :ets.lookup state.store, key == [] do
+ if state.reqs[key] != nil do
nretry = case state.retries[key] do
nil -> 1
n -> n+1
end
- if nretry < 3 do
+ if nretry < @max_failures do
ask_random_peers(state, key)
%{state | retries: Map.put(state.retries, key, nretry)}
else
@@ -170,28 +178,26 @@ defmodule SApp.BlockStore do
def handle_cast({:set_roots, roots}, state) do
cached_why = {:cached, System.os_time(:seconds) + @cache_ttl}
- # Set old roots as cached
- for [id, val] <- :ets.match state.store, {:"$1", :root, :"$2"} do
- :ets.insert state.store, {id, cached_why, val}
- end
- for [id] <- :ets.match state.store, {:"$1", :root} do
- :ets.insert state.store, {id, cached_why}
- end
-
- # Set old deps as cached
- for [id, val] <- :ets.match state.store, {:"$1", {:req_by, :_}, :"$2"} do
- :ets.insert state.store, {id, cached_why, val}
- end
- for [id] <- :ets.match state.store, {:"$1", {:req_by, :_}} do
- :ets.insert state.store, {id, cached_why}
+ # Set old roots and their deps as cached
+ for ent <- :ets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] },
+ { {:"$1", :root}, [], [:"$$"] },
+ { {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] },
+ { {:"$1", {:req_by, :_}}, [], [:"$$"] }]
+ do
+ case ent do
+ [id, bin] ->
+ :ets.insert state.store, {id, cached_why, bin}
+ [id] ->
+ :ets.insert state.store, {id, cached_why}
+ end
end
# Set new roots as roots
for root <- roots do
case :ets.lookup state.store, root do
- [{^root, _, val}] ->
- :ets.insert state.store, {root, :root, val}
- rec_set_dep state.store, root, val
+ [{^root, _, bin}] ->
+ :ets.insert state.store, {root, :root, bin}
+ rec_set_dep(state, root, SData.term_unbin bin)
[{^root, _}] ->
:ets.insert state.store, {root, :root}
[] ->
@@ -201,14 +207,16 @@ defmodule SApp.BlockStore do
{:noreply, state}
end
- defp rec_set_dep(store, hash, val0) do
+ defp rec_set_dep(state, hash, val0) do
for dep <- SData.Page.refs val0 do
- case :ets.lookup store, dep do
- [{^dep, _, val}] ->
- :ets.insert store, {dep, {:req_by, hash}, val}
- rec_set_dep(store, dep, val)
- _ ->
- :ets.insert store, {dep, {:req_by, hash}}
+ case :ets.lookup state.store, dep do
+ [{^dep, _, bin}] ->
+ :ets.insert state.store, {dep, {:req_by, hash}, bin}
+ rec_set_dep(state, dep, SData.term_unbin bin)
+ [{^dep, _}] ->
+ :ets.insert state.store, {dep, {:req_by, hash}}
+ [] ->
+ init_rec_pull state, dep, {:req_by, hash}, []
end
end
end
@@ -235,13 +243,15 @@ defmodule SApp.BlockStore do
defimpl SData.PageStore do
def put(store, page) do
- hash = GenServer.call(store.pid, {:put, page})
+ bin = SData.term_bin page
+ hash = GenServer.call(store.pid, {:put, bin})
{ hash, store }
end
def get(store, hash) do
try do
- GenServer.call(store.pid, {:get, hash, store.prefer_ask})
+ bin = GenServer.call(store.pid, {:get, hash, store.prefer_ask})
+ SData.term_unbin bin
catch
:exit, {:timeout, _} -> nil
end
diff --git a/shard/lib/data/data.ex b/shard/lib/data/data.ex
index c2c659d..78c73cd 100644
--- a/shard/lib/data/data.ex
+++ b/shard/lib/data/data.ex
@@ -18,6 +18,18 @@ defmodule SData do
:crypto.hash(algo, (:erlang.term_to_binary term))
end
+ def term_bin(term) do
+ :erlang.term_to_binary term
+ end
+
+ def bin_hash(bin, algo \\ :sha256) do
+ :crypto.hash(algo, bin)
+ end
+
+ def term_unbin(bin) do
+ :erlang.binary_to_term(bin, [:safe])
+ end
+
@doc"""
Compare function for arbitrary terms using the Erlang order
"""