diff options
author | Alex Auvolat <alex@adnab.me> | 2018-09-11 17:21:14 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-09-11 17:21:14 +0200 |
commit | 9695231c327e052c5d5bc61197cc8222599fb91a (patch) | |
tree | c0d0f247269729658ad354a32475f34fa7268ed1 /shard | |
parent | a033c82a3c656a8f53feb60b5b149680771ac247 (diff) | |
download | shard-9695231c327e052c5d5bc61197cc8222599fb91a.tar.gz shard-9695231c327e052c5d5bc61197cc8222599fb91a.zip |
Block store dependency management & caching (NOT TESTED)
Diffstat (limited to 'shard')
-rw-r--r-- | shard/lib/app/blockstore.ex | 180 | ||||
-rw-r--r-- | shard/lib/app/chat.ex | 13 | ||||
-rw-r--r-- | shard/lib/cli/cli.ex | 18 | ||||
-rw-r--r-- | shard/lib/data/merklesearchtree.ex | 51 | ||||
-rw-r--r-- | shard/lib/manager.ex | 2 | ||||
-rw-r--r-- | shard/test/conn_test.exs | 5 | ||||
-rw-r--r-- | shard/test/mst_test.exs | 20 | ||||
-rw-r--r-- | shard/test/test_helper.exs | 2 |
8 files changed, 192 insertions, 99 deletions
diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/blockstore.ex index 5e93135..f1140b2 100644 --- a/shard/lib/app/blockstore.ex +++ b/shard/lib/app/blockstore.ex @@ -5,7 +5,14 @@ defmodule SApp.BlockStore do This is not a shard, it is a side process that a shard may use to store its data. - TODO: WIP + Uses an ETS table of: + + { block_id, why_have_it } -- waiting for data + { block_id, why_have_it, data } -- once we have the data + + why_have_it := :root + | {:req_by, some_other_block_id} + | {:cached, expiry_date} """ use GenServer @@ -13,6 +20,8 @@ defmodule SApp.BlockStore do @enforce_keys [:pid] defstruct [:pid, :prefer_ask] + @cache_ttl 600 # Ten minutes + @clean_cache_every 60 # One minute defmodule State do defstruct [:shard_id, :path, :store, :reqs, :retries] @@ -26,47 +35,86 @@ defmodule SApp.BlockStore do def init([shard_id, path]) do Shard.Manager.dispatch_to(shard_id, path, self()) - {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}} + store_path = String.to_atom "#{Base.encode16 shard_id}/#{Atom.to_string path}" + store = :ets.new store_path, [:set, :protected] + + Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) + + {:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}} end def handle_call({:get, key, prefer_ask}, from, state) do - case state.store[key] do - nil -> - case prefer_ask do - [_ | _] -> - for peer <- prefer_ask do - Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) - end - _ -> - ask_random_peers(state, key) - end - reqs_key = case state.reqs[key] do - nil -> - MapSet.put(MapSet.new(), from) - ms -> - MapSet.put(ms, from) - end - state = put_in(state.reqs[key], reqs_key) - {:noreply, state} - v -> + case :ets.lookup state.store, key do + [{_, _, v}] -> {:reply, v, state} + [{_, _}] -> + state = add_request(state, key, from) + {:noreply, state} + [] -> + why = {:cached, System.os_time(:seconds) + @cache_ttl} + init_rec_pull(state, key, why, prefer_ask) + state = add_request(state, key, from) + {:noreply, state} end end def handle_call({:put, val}, _from, state) do hash = SData.term_hash val - state = %{state | store: Map.put(state.store, hash, val)} + store_put(state, hash, val) {:reply, hash, state} end + defp add_request(state, key, from) do + reqs_key = case state.reqs[key] do + nil -> + MapSet.put(MapSet.new(), from) + ms -> + MapSet.put(ms, from) + end + put_in(state.reqs[key], reqs_key) + end + + defp store_put(state, hash, val) do + case :ets.lookup state.store, hash do + [] -> + :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, val} + nil + [{_, why}] -> + :ets.insert state.store, {hash, why, val} + why + [{_, _, _}] -> + nil + end + end + + defp init_rec_pull(state, key, why, prefer_ask) do + case prefer_ask do + [_ | _] -> + for peer <- prefer_ask do + Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) + end + _ -> + ask_random_peers(state, key) + end + :ets.insert state.store, {key, why} + end + + def handle_cast({:rec_pull, hash, ask_to}, state) do + if :ets.lookup state.store, hash == [] do + why = {:cached, System.os_time(:seconds) + @cache_ttl} + init_rec_pull(state, hash, why, ask_to) + end + {:noreply, state} + end + def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do state = case msg do {:get, key} -> - case state.store[key] do - nil -> - Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) - v -> + case :ets.lookup state.store, key do + [{_, _, v}] -> Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}}) + _ -> + Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) end state {:info, hash, value} -> @@ -80,12 +128,22 @@ defmodule SApp.BlockStore do Map.delete(state.reqs, hash) end state = %{state | retries: Map.delete(state.retries, hash)} - %{state | store: Map.put(state.store, hash, value), reqs: reqs} + rec_why = store_put state, hash, value + if rec_why != nil do + sub_why = case rec_why do + {:cached, ttl} -> {:cached, ttl} + _ -> {:req_by, hash} + end + for dep <- SData.Page.refs value do + init_rec_pull(state, dep, sub_why, [peer_id]) + end + end + %{state | reqs: reqs} else state end {:not_found, key} -> - if state.reqs[key] != nil and state.store[key] == nil do + if state.reqs[key] != nil and :ets.lookup state.store, key == [] do nretry = case state.retries[key] do nil -> 1 n -> n+1 @@ -107,6 +165,64 @@ defmodule SApp.BlockStore do end {:noreply, state} end + + def handle_cast({:set_roots, roots}, state) do + cached_why = {:cached, System.os_time(:seconds) + @cache_ttl} + + # Set old roots as cached + for [id, val] <- :ets.match state.store, {:"$1", :root, :"$2"} do + :ets.insert state.store, {id, cached_why, val} + end + for [id] <- :ets.match state.store, {:"$1", :root} do + :ets.insert state.store, {id, cached_why} + end + + # Set old deps as cached + for [id, val] <- :ets.match state.store, {:"$1", {:req_by, :_}, :"$2"} do + :ets.insert state.store, {id, cached_why, val} + end + for [id] <- :ets.match state.store, {:"$1", {:req_by, :_}} do + :ets.insert state.store, {id, cached_why} + end + + # Set new roots as roots + for root <- roots do + case :ets.lookup state.store, root do + [{^root, _, val}] -> + :ets.insert state.store, {root, :root, val} + rec_set_dep state.store, root, val + [{^root, _}] -> + :ets.insert state.store, {root, :root} + [] -> + init_rec_pull state, root, :root, [] + end + end + {:noreply, state} + end + + defp rec_set_dep(store, hash, val0) do + for dep <- SData.Page.refs val0 do + case :ets.lookup store, dep do + [{^dep, _, val}] -> + :ets.insert store, {dep, {:req_by, hash}, val} + rec_set_dep(store, dep, val) + _ -> + :ets.insert store, {dep, {:req_by, hash}} + end + end + end + + def handle_info(:clean_cache, state) do + currtime = System.os_time :seconds + + cache_cleanup_1 = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [:'$1']} ] + cache_cleanup_2 = [ {{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [:'$1']} ] + :ets.select_delete(state.store, cache_cleanup_1) + :ets.select_delete(state.store, cache_cleanup_2) + + Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) + {:noreply, state} + end def ask_random_peers(state, key) do peers = Shard.Manager.get_shard_peers(state.shard_id) @@ -117,7 +233,6 @@ defmodule SApp.BlockStore do end end - defimpl SData.PageStore do def put(store, page) do hash = GenServer.call(store.pid, {:put, page}) @@ -133,12 +248,7 @@ defmodule SApp.BlockStore do end def copy(store, other_store, hash) do - page = SData.PageStore.get(other_store, hash) - refs = SData.Page.refs(page) - for ref <- refs do - copy(store, other_store, ref) - end - GenServer.call(store.pid, {:put, page}) + GenServer.cast(store.pid, {:rec_pull, hash, other_store.prefer_ask}) store end diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 471d8f7..4752dc6 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -160,6 +160,7 @@ defmodule SApp.Chat do if mst2.root == new_root do # This was the only message missing, we are happy! state = %{state | mst: mst2} + GenServer.cast(state.block_store, {:set_roots, [mst2.root]}) msg_callback(state, msgitem) state else @@ -186,9 +187,19 @@ defmodule SApp.Chat do defp init_merge(state, new_root, source_peer) do # TODO: make the merge asynchronous + prev_last = for {x, true} <- MST.last(state.mst, nil, 100), into: MapSet.new, do: x + mgmst = %{state.mst | root: new_root} mgmst = put_in(mgmst.store.prefer_ask, [source_peer]) - mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end) + mst = MST.merge(state.mst, mgmst) + + for {x, true} <- MST.last(mst, nil, 100) do + if not MapSet.member? prev_last, x do + msg_callback(state, x) + end + end + + GenServer.cast(state.block_store, {:set_roots, [mst.root]}) %{state | mst: mst} end diff --git a/shard/lib/cli/cli.ex b/shard/lib/cli/cli.ex index c3afe8f..bf3a555 100644 --- a/shard/lib/cli/cli.ex +++ b/shard/lib/cli/cli.ex @@ -4,6 +4,10 @@ defmodule SCLI do """ def run() do + for {_chid, _manifest, chpid} <- Shard.Manager.list_shards do + GenServer.cast(chpid, {:subscribe, self()}) + end + run(nil) end @@ -64,11 +68,15 @@ defmodule SCLI do end defp handle_command(pid, ["hist"]) do - GenServer.call(pid, {:read_history, nil, 25}) - |> Enum.each(fn {{ts, nick, msg}, true} -> - IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" - end) - pid + if pid == nil do + IO.puts "Not currently on a channel!" + else + GenServer.call(pid, {:read_history, nil, 25}) + |> Enum.each(fn {{ts, nick, msg}, true} -> + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" + end) + pid + end end defp handle_command(_pid, ["join", qchan]) do diff --git a/shard/lib/data/merklesearchtree.ex b/shard/lib/data/merklesearchtree.ex index 039f6ce..49d54a5 100644 --- a/shard/lib/data/merklesearchtree.ex +++ b/shard/lib/data/merklesearchtree.ex @@ -193,21 +193,19 @@ defmodule SData.MerkleSearchTree do The merge is not symmetrical in the sense that: - new pages are added in the store of the first argument - - the callback is called for all items found in the second argument and not the first """ - def merge(to, from, callback \\ fn _, _ -> nil end) do - { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback) + def merge(to, from) do + { store, root } = merge_aux(to, from, to.store, to.root, from.root) %{ to | store: store, root: root } end - defp merge_aux(s1, s2, store, r1, r2, callback) do + defp merge_aux(s1, s2, store, r1, r2) do case {r1, r2} do _ when r1 == r2 -> { store, r1 } {_, nil} -> { store, r1 } {nil, _} -> store = Store.copy(store, s2.store, r2) - rec_callback(store, r2, callback) { store, r2 } _ -> %Page{ level: level1, low: low1, list: lst1 } = Store.get(store, r1) @@ -217,64 +215,49 @@ defmodule SData.MerkleSearchTree do level1 > level2 -> {level1, low1, lst1, r2, []} level2 > level1 -> {level2, r1, [], low2, lst2} end - { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) + { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2) page = %Page{ level: level, low: low, list: lst } {hash, store} = Store.put(store, page) {store, hash} end end - defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) do + defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2) do case {lst1, lst2} do { [], [] } -> - {store, hash} = merge_aux(s1, s2, store, low1, low2, callback) + {store, hash} = merge_aux(s1, s2, store, low1, low2) {store, hash, []} { [], [ {k, v, r} | rst2 ] } -> {low1l, low1h, store} = split(s1, store, low1, k) - {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback) - callback.(k, v) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2, callback) + {store, newlow} = merge_aux(s1, s2, store, low1l, low2) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2) {store, newlow, [ {k, v, newr} | newrst ]} { [ {k, v, r} | rst1 ], [] } -> {low2l, low2h, store} = split(s2, store, low2, k) - {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [], callback) + {store, newlow} = merge_aux(s1, s2, store, low1, low2l) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, []) {store, newlow, [ {k, v, newr} | newrst ]} { [ {k1, v1, r1} | rst1 ], [ {k2, v2, r2} | rst2 ] } -> case s1.cmp.(k1, k2) do :before -> {low2l, low2h, store} = split(s2, store, low2, k1) - {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2, callback) + {store, newlow} = merge_aux(s1, s2, store, low1, low2l) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2) {store, newlow, [ {k1, v1, newr} | newrst ]} :after -> {low1l, low1h, store} = split(s1, store, low1, k2) - {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback) - callback.(k2, v2) - {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2, callback) + {store, newlow} = merge_aux(s1, s2, store, low1l, low2) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2) {store, newlow, [ {k2, v2, newr} | newrst ]} :duplicate -> - {store, newlow} = merge_aux(s1, s2, store, low1, low2, callback) - newv = s1.merge.(v1, v2) ## TODO: callback here ?? - {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2, callback) + {store, newlow} = merge_aux(s1, s2, store, low1, low2) + newv = s1.merge.(v1, v2) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2) {store, newlow, [ {k1, newv, newr} | newrst ]} end end end - defp rec_callback(store, root, callback) do - case root do - nil -> nil - _ -> - %Page{ level: _, low: low, list: lst } = Store.get(store, root) - rec_callback(store, low, callback) - for {k, v, rst} <- lst do - callback.(k, v) - rec_callback(store, rst, callback) - end - end - end - @doc""" Get value for a specific key in search tree. """ diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index 3f2bddb..8cedce2 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -33,7 +33,7 @@ defmodule Shard.Manager do - :outbox Multi-list of - { peer_id, message } + { peer_id, message, time_inserted } """ diff --git a/shard/test/conn_test.exs b/shard/test/conn_test.exs index 275f6dd..ae43d9d 100644 --- a/shard/test/conn_test.exs +++ b/shard/test/conn_test.exs @@ -6,10 +6,10 @@ defmodule ShardTest.Conn do require Salty.Sign.Ed25519, as: Sign test "crypto connection" do - {:ok, srv_pkey, srv_skey} = Sign.keypair + {srv_pkey, srv_skey} = Shard.Identity.get_keypair {:ok, sess_pkey, sess_skey} = Box.keypair {:ok, challenge} = Salty.Random.buf 32 - {:ok, socket} = :gen_tcp.connect {127,0,0,1}, 4044, [:binary, packet: 2, active: false] + {:ok, socket} = :gen_tcp.connect {127,0,0,1}, 4045, [:binary, packet: 2, active: false] hello = {srv_pkey, sess_pkey, challenge, 0} :gen_tcp.send(socket, :erlang.term_to_binary hello) @@ -50,6 +50,7 @@ defmodule ShardTest.Conn do receive do after 100 -> nil end end + @tag :skip test "connect to chat rooms" do {:ok, pid1} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "test"}) {:ok, pid2} = DynamicSupervisor.start_child(Shard.DynamicSupervisor, {SApp.Chat, "other_test"}) diff --git a/shard/test/mst_test.exs b/shard/test/mst_test.exs index c1758ad..cf5a898 100644 --- a/shard/test/mst_test.exs +++ b/shard/test/mst_test.exs @@ -174,24 +174,4 @@ defmodule ShardTest.MST do assert MST.last(mg1, nil, 2000) == all_items end - test "merkle search tree 8: MST.merge callback" do - items1 = (for i <- 1..1000, do: i*2+40) - items2 = (for i <- 1..1000, do: i*3) - - y = Enum.reduce(items1, %MST{}, fn i, acc -> MST.insert(acc, i) end) - z = Enum.reduce(items2, %MST{}, fn i, acc -> MST.insert(acc, i) end) - - {:ok, cb_called} = Agent.start_link fn -> [] end - - cb = fn i, true -> Agent.update(cb_called, fn x -> [i | x] end) end - mg = MST.merge(y, z, cb) - - cb_vals = Agent.get cb_called, &(&1) - expected = MapSet.difference(MapSet.new(items2), MapSet.new(items1)) - |> MapSet.to_list - |> Enum.sort - |> Enum.reverse - assert expected == cb_vals - end - end diff --git a/shard/test/test_helper.exs b/shard/test/test_helper.exs index e5b6600..e6a4f8e 100644 --- a/shard/test/test_helper.exs +++ b/shard/test/test_helper.exs @@ -1,4 +1,4 @@ -ExUnit.start() +ExUnit.start(exclude: [:skip]) case :gen_tcp.connect('localhost', 4045, []) do {:ok, socket} -> |