aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-09-11 17:21:14 +0200
committerAlex Auvolat <alex@adnab.me>2018-09-11 17:21:14 +0200
commit9695231c327e052c5d5bc61197cc8222599fb91a (patch)
treec0d0f247269729658ad354a32475f34fa7268ed1
parenta033c82a3c656a8f53feb60b5b149680771ac247 (diff)
downloadshard-9695231c327e052c5d5bc61197cc8222599fb91a.tar.gz
shard-9695231c327e052c5d5bc61197cc8222599fb91a.zip
Block store dependency management & caching (NOT TESTED)
-rw-r--r--shard/lib/app/blockstore.ex180
-rw-r--r--shard/lib/app/chat.ex13
-rw-r--r--shard/lib/cli/cli.ex18
-rw-r--r--shard/lib/data/merklesearchtree.ex51
-rw-r--r--shard/lib/manager.ex2
-rw-r--r--shard/test/conn_test.exs5
-rw-r--r--shard/test/mst_test.exs20
-rw-r--r--shard/test/test_helper.exs2
8 files changed, 192 insertions, 99 deletions
diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/blockstore.ex
index 5e93135..f1140b2 100644
--- a/shard/lib/app/blockstore.ex
+++ b/shard/lib/app/blockstore.ex
@@ -5,7 +5,14 @@ defmodule SApp.BlockStore do
This is not a shard, it is a side process that a shard may use to store its data.
- TODO: WIP
+ Uses an ETS table of:
+
+ { block_id, why_have_it } -- waiting for data
+ { block_id, why_have_it, data } -- once we have the data
+
+ why_have_it := :root
+ | {:req_by, some_other_block_id}
+ | {:cached, expiry_date}
"""
use GenServer
@@ -13,6 +20,8 @@ defmodule SApp.BlockStore do
@enforce_keys [:pid]
defstruct [:pid, :prefer_ask]
+ @cache_ttl 600 # Ten minutes
+ @clean_cache_every 60 # One minute
defmodule State do
defstruct [:shard_id, :path, :store, :reqs, :retries]
@@ -26,47 +35,86 @@ defmodule SApp.BlockStore do
def init([shard_id, path]) do
Shard.Manager.dispatch_to(shard_id, path, self())
- {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}}
+ store_path = String.to_atom "#{Base.encode16 shard_id}/#{Atom.to_string path}"
+ store = :ets.new store_path, [:set, :protected]
+
+ Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
+
+ {:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}}
end
def handle_call({:get, key, prefer_ask}, from, state) do
- case state.store[key] do
- nil ->
- case prefer_ask do
- [_ | _] ->
- for peer <- prefer_ask do
- Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
- end
- _ ->
- ask_random_peers(state, key)
- end
- reqs_key = case state.reqs[key] do
- nil ->
- MapSet.put(MapSet.new(), from)
- ms ->
- MapSet.put(ms, from)
- end
- state = put_in(state.reqs[key], reqs_key)
- {:noreply, state}
- v ->
+ case :ets.lookup state.store, key do
+ [{_, _, v}] ->
{:reply, v, state}
+ [{_, _}] ->
+ state = add_request(state, key, from)
+ {:noreply, state}
+ [] ->
+ why = {:cached, System.os_time(:seconds) + @cache_ttl}
+ init_rec_pull(state, key, why, prefer_ask)
+ state = add_request(state, key, from)
+ {:noreply, state}
end
end
def handle_call({:put, val}, _from, state) do
hash = SData.term_hash val
- state = %{state | store: Map.put(state.store, hash, val)}
+ store_put(state, hash, val)
{:reply, hash, state}
end
+ defp add_request(state, key, from) do
+ reqs_key = case state.reqs[key] do
+ nil ->
+ MapSet.put(MapSet.new(), from)
+ ms ->
+ MapSet.put(ms, from)
+ end
+ put_in(state.reqs[key], reqs_key)
+ end
+
+ defp store_put(state, hash, val) do
+ case :ets.lookup state.store, hash do
+ [] ->
+ :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, val}
+ nil
+ [{_, why}] ->
+ :ets.insert state.store, {hash, why, val}
+ why
+ [{_, _, _}] ->
+ nil
+ end
+ end
+
+ defp init_rec_pull(state, key, why, prefer_ask) do
+ case prefer_ask do
+ [_ | _] ->
+ for peer <- prefer_ask do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ _ ->
+ ask_random_peers(state, key)
+ end
+ :ets.insert state.store, {key, why}
+ end
+
+ def handle_cast({:rec_pull, hash, ask_to}, state) do
+ if :ets.lookup state.store, hash == [] do
+ why = {:cached, System.os_time(:seconds) + @cache_ttl}
+ init_rec_pull(state, hash, why, ask_to)
+ end
+ {:noreply, state}
+ end
+
def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
state = case msg do
{:get, key} ->
- case state.store[key] do
- nil ->
- Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
- v ->
+ case :ets.lookup state.store, key do
+ [{_, _, v}] ->
Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
+ _ ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
end
state
{:info, hash, value} ->
@@ -80,12 +128,22 @@ defmodule SApp.BlockStore do
Map.delete(state.reqs, hash)
end
state = %{state | retries: Map.delete(state.retries, hash)}
- %{state | store: Map.put(state.store, hash, value), reqs: reqs}
+ rec_why = store_put state, hash, value
+ if rec_why != nil do
+ sub_why = case rec_why do
+ {:cached, ttl} -> {:cached, ttl}
+ _ -> {:req_by, hash}
+ end
+ for dep <- SData.Page.refs value do
+ init_rec_pull(state, dep, sub_why, [peer_id])
+ end
+ end
+ %{state | reqs: reqs}
else
state
end
{:not_found, key} ->
- if state.reqs[key] != nil and state.store[key] == nil do
+ if state.reqs[key] != nil and :ets.lookup state.store, key == [] do
nretry = case state.retries[key] do
nil -> 1
n -> n+1
@@ -107,6 +165,64 @@ defmodule SApp.BlockStore do
end
{:noreply, state}
end
+
+ def handle_cast({:set_roots, roots}, state) do
+ cached_why = {:cached, System.os_time(:seconds) + @cache_ttl}
+
+ # Set old roots as cached
+ for [id, val] <- :ets.match state.store, {:"$1", :root, :"$2"} do
+ :ets.insert state.store, {id, cached_why, val}
+ end
+ for [id] <- :ets.match state.store, {:"$1", :root} do
+ :ets.insert state.store, {id, cached_why}
+ end
+
+ # Set old deps as cached
+ for [id, val] <- :ets.match state.store, {:"$1", {:req_by, :_}, :"$2"} do
+ :ets.insert state.store, {id, cached_why, val}
+ end
+ for [id] <- :ets.match state.store, {:"$1", {:req_by, :_}} do
+ :ets.insert state.store, {id, cached_why}
+ end
+
+ # Set new roots as roots
+ for root <- roots do
+ case :ets.lookup state.store, root do
+ [{^root, _, val}] ->
+ :ets.insert state.store, {root, :root, val}
+ rec_set_dep state.store, root, val
+ [{^root, _}] ->
+ :ets.insert state.store, {root, :root}
+ [] ->
+ init_rec_pull state, root, :root, []
+ end
+ end
+ {:noreply, state}
+ end
+
+ defp rec_set_dep(store, hash, val0) do
+ for dep <- SData.Page.refs val0 do
+ case :ets.lookup store, dep do
+ [{^dep, _, val}] ->
+ :ets.insert store, {dep, {:req_by, hash}, val}
+ rec_set_dep(store, dep, val)
+ _ ->
+ :ets.insert store, {dep, {:req_by, hash}}
+ end
+ end
+ end
+
+ def handle_info(:clean_cache, state) do
+ currtime = System.os_time :seconds
+
+ cache_cleanup_1 = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [:'$1']} ]
+ cache_cleanup_2 = [ {{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [:'$1']} ]
+ :ets.select_delete(state.store, cache_cleanup_1)
+ :ets.select_delete(state.store, cache_cleanup_2)
+
+ Process.send_after(self(), :clean_cache, @clean_cache_every * 1000)
+ {:noreply, state}
+ end
def ask_random_peers(state, key) do
peers = Shard.Manager.get_shard_peers(state.shard_id)
@@ -117,7 +233,6 @@ defmodule SApp.BlockStore do
end
end
-
defimpl SData.PageStore do
def put(store, page) do
hash = GenServer.call(store.pid, {:put, page})
@@ -133,12 +248,7 @@ defmodule SApp.BlockStore do
end
def copy(store, other_store, hash) do
- page = SData.PageStore.get(other_store, hash)
- refs = SData.Page.refs(page)
- for ref <- refs do
- copy(store, other_store, ref)
- end
- GenServer.call(store.pid, {:put, page})
+ GenServer.cast(store.pid, {:rec_pull, hash, other_store.prefer_ask})
store
end
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index 471d8f7..4752dc6 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -160,6 +160,7 @@ defmodule SApp.Chat do
if mst2.root == new_root do
# This was the only message missing, we are happy!
state = %{state | mst: mst2}
+ GenServer.cast(state.block_store, {:set_roots, [mst2.root]})
msg_callback(state, msgitem)
state
else
@@ -186,9 +187,19 @@ defmodule SApp.Chat do
defp init_merge(state, new_root, source_peer) do
# TODO: make the merge asynchronous
+ prev_last = for {x, true} <- MST.last(state.mst, nil, 100), into: MapSet.new, do: x
+
mgmst = %{state.mst | root: new_root}
mgmst = put_in(mgmst.store.prefer_ask, [source_peer])
- mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end)
+ mst = MST.merge(state.mst, mgmst)
+
+ for {x, true} <- MST.last(mst, nil, 100) do
+ if not MapSet.member? prev_last, x do
+ msg_callback(state, x)
+ end
+ end
+
+ GenServer.cast(state.block_store, {:set_roots, [mst.root]})
%{state | mst: mst}
end
diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex
index c3afe8f..bf3a555 100644
--- a/shard/lib/cli/cli.ex
+++ b/shard/lib/cli/cli.ex
@@ -4,6 +4,10 @@ defmodule SCLI do
"""
def run() do
+ for {_chid, _manifest, chpid} <- Shard.Manager.list_shards do
+ GenServer.cast(chpid, {:subscribe, self()})
+ end
+
run(nil)
end
@@ -64,11 +68,15 @@ defmodule SCLI do
end
defp handle_command(pid, ["hist"]) do
- GenServer.call(pid, {:read_history, nil, 25})
- |> Enum.each(fn {{ts, nick, msg}, true} ->
- IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}"
- end)
- pid
+ if pid == nil do
+ IO.puts "Not currently on a channel!"
+ else
+ GenServer.call(pid, {:read_history, nil, 25})
+ |> Enum.each(fn {{ts, nick, msg}, true} ->
+ IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}"
+ end)
+ pid
+ end
end
defp handle_command(_pid, ["join", qchan]) do
diff --git a/shard/lib/data/merklesearchtree.ex b/shard/lib/data/merklesearchtree.ex
index 039f6ce..49d54a5 100644
--- a/shard/lib/data/merklesearchtree.ex
+++ b/shard/lib/data/merklesearchtree.ex
@@ -193,21 +193,19 @@ defmodule SData.MerkleSearchTree do
The merge is not symmetrical in the sense that:
- new pages are added in the store of the first argument
- - the callback is called for all items found in the second argument and not the first
"""
- def merge(to, from, callback \\ fn _, _ -> nil end) do
- { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback)
+ def merge(to, from) do
+ { store, root } = merge_aux(to, from, to.store, to.root, from.root)
%{ to | store: store, root: root }
end
- defp merge_aux(s1, s2, store, r1, r2, callback) do
+ defp merge_aux(s1, s2, store, r1, r2) do
case {r1, r2} do
_ when r1 == r2 -> { store, r1 }
{_, nil} ->
{ store, r1 }
{nil, _} ->
store = Store.copy(store, s2.store, r2)
- rec_callback(store, r2, callback)
{ store, r2 }
_ ->
%Page{ level: level1, low: low1, list: lst1 } = Store.get(store, r1)
@@ -217,64 +215,49 @@ defmodule SData.MerkleSearchTree do
level1 > level2 -> {level1, low1, lst1, r2, []}
level2 > level1 -> {level2, r1, [], low2, lst2}
end
- { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback)
+ { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2)
page = %Page{ level: level, low: low, list: lst }
{hash, store} = Store.put(store, page)
{store, hash}
end
end
- defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) do
+ defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2) do
case {lst1, lst2} do
{ [], [] } ->
- {store, hash} = merge_aux(s1, s2, store, low1, low2, callback)
+ {store, hash} = merge_aux(s1, s2, store, low1, low2)
{store, hash, []}
{ [], [ {k, v, r} | rst2 ] } ->
{low1l, low1h, store} = split(s1, store, low1, k)
- {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback)
- callback.(k, v)
- {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2, callback)
+ {store, newlow} = merge_aux(s1, s2, store, low1l, low2)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2)
{store, newlow, [ {k, v, newr} | newrst ]}
{ [ {k, v, r} | rst1 ], [] } ->
{low2l, low2h, store} = split(s2, store, low2, k)
- {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback)
- {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [], callback)
+ {store, newlow} = merge_aux(s1, s2, store, low1, low2l)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [])
{store, newlow, [ {k, v, newr} | newrst ]}
{ [ {k1, v1, r1} | rst1 ], [ {k2, v2, r2} | rst2 ] } ->
case s1.cmp.(k1, k2) do
:before ->
{low2l, low2h, store} = split(s2, store, low2, k1)
- {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback)
- {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2, callback)
+ {store, newlow} = merge_aux(s1, s2, store, low1, low2l)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2)
{store, newlow, [ {k1, v1, newr} | newrst ]}
:after ->
{low1l, low1h, store} = split(s1, store, low1, k2)
- {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback)
- callback.(k2, v2)
- {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2, callback)
+ {store, newlow} = merge_aux(s1, s2, store, low1l, low2)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2)
{store, newlow, [ {k2, v2, newr} | newrst ]}
:duplicate ->
- {store, newlow} = merge_aux(s1, s2, store, low1, low2, callback)
- newv = s1.merge.(v1, v2) ## TODO: callback here ??
- {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2, callback)
+ {store, newlow} = merge_aux(s1, s2, store, low1, low2)
+ newv = s1.merge.(v1, v2)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2)
{store, newlow, [ {k1, newv, newr} | newrst ]}
end
end
end
- defp rec_callback(store, root, callback) do
- case root do
- nil -> nil
- _ ->
- %Page{ level: _, low: low, list: lst } = Store.get(store, root)
- rec_callback(store, low, callback)
- for {k, v, rst} <- lst do
- callback.(k, v)
- rec_callback(store, rst, callback)
- end
- end
- end
-
@doc"""
Get value for a specific key in search tree.
"""
diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex
index 3f2bddb..8cedce2 100644
--- a/shard/lib/manager.ex
+++ b/shard/lib/manager.ex
@@ -33,7 +33,7 @@ defmodule Shard.Manager do
- :outbox
Multi-list of
- { peer_id, message }
+ { peer_id, message, time_inserted }
"""
diff --git a/shard/test/conn_test.exs b/shard/test/conn_test.exs
index 275f6dd..ae43d9d 100644
--- a/shard/test/conn_test.exs
+++ b/shard/test/conn_test.exs
@@ -6,10 +6,10 @@ defmodule ShardTest.Conn do
require Salty.Sign.Ed25519, as: Sign
test "crypto connection" do
- {:ok, srv_pkey, srv_skey} = Sign.keypair
+ {srv_pkey, srv_skey} = Shard.Identity.get_keypair
{:ok, sess_pkey, sess_skey} = Box.keypair
{:ok, challenge} = Salty.Random.buf 32
- {:ok, socket} = :gen_tcp.connect {127,0,0,1}, 4044, [:binary, packet: 2, active: false]
+ {:ok, socket} = :gen_tcp.connect {127,0,0,1}, 4045, [:binary, packet: 2, active: false]
hello = {srv_pkey, sess_pkey, challenge, 0}
:gen_tcp.send(socket, :erlang.term_to_binary hello)
@@ -50,6 +50,7 @@ defmodule ShardTest.Conn do
receive do after 100 -> nil end
end
+ @tag :skip
test "connect to chat rooms" do
{:ok, pid1} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"})
{:ok, pid2} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "other_test"})
diff --git a/shard/test/mst_test.exs b/shard/test/mst_test.exs
index c1758ad..cf5a898 100644
--- a/shard/test/mst_test.exs
+++ b/shard/test/mst_test.exs
@@ -174,24 +174,4 @@ defmodule ShardTest.MST do
assert MST.last(mg1, nil, 2000) == all_items
end
- test "merkle search tree 8: MST.merge callback" do
- items1 = (for i <- 1..1000, do: i*2+40)
- items2 = (for i <- 1..1000, do: i*3)
-
- y = Enum.reduce(items1, %MST{}, fn i, acc -> MST.insert(acc, i) end)
- z = Enum.reduce(items2, %MST{}, fn i, acc -> MST.insert(acc, i) end)
-
- {:ok, cb_called} = Agent.start_link fn -> [] end
-
- cb = fn i, true -> Agent.update(cb_called, fn x -> [i | x] end) end
- mg = MST.merge(y, z, cb)
-
- cb_vals = Agent.get cb_called, &(&1)
- expected = MapSet.difference(MapSet.new(items2), MapSet.new(items1))
- |> MapSet.to_list
- |> Enum.sort
- |> Enum.reverse
- assert expected == cb_vals
- end
-
end
diff --git a/shard/test/test_helper.exs b/shard/test/test_helper.exs
index e5b6600..e6a4f8e 100644
--- a/shard/test/test_helper.exs
+++ b/shard/test/test_helper.exs
@@ -1,4 +1,4 @@
-ExUnit.start()
+ExUnit.start(exclude: [:skip])
case :gen_tcp.connect('localhost', 4045, []) do
{:ok, socket} ->