From 78e5caa664b860189ea86f95fe68ad5e6705897b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 25 Sep 2018 19:57:35 +0200 Subject: s/block/page, change semantics to use more binaries, fixes --- shard/lib/app/blockstore.ex | 259 ------------------------------------------ shard/lib/app/chat.ex | 10 +- shard/lib/app/pagestore.ex | 269 ++++++++++++++++++++++++++++++++++++++++++++ shard/lib/data/data.ex | 12 ++ 4 files changed, 286 insertions(+), 264 deletions(-) delete mode 100644 shard/lib/app/blockstore.ex create mode 100644 shard/lib/app/pagestore.ex (limited to 'shard') diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/blockstore.ex deleted file mode 100644 index 077235a..0000000 --- a/shard/lib/app/blockstore.ex +++ /dev/null @@ -1,259 +0,0 @@ -defmodule SApp.BlockStore do - @moduledoc """ - A module that implements a content-adressable storage (blocks, or pages, - identified by the hash of their contents). - - This is not a shard, it is a side process that a shard may use to store its data. - - Uses an ETS table of: - - { block_id, why_have_it } -- waiting for data - { block_id, why_have_it, data } -- once we have the data - - why_have_it := :root - | {:req_by, some_other_block_id} - | {:cached, expiry_date} - """ - - use GenServer - - @enforce_keys [:pid] - defstruct [:pid, :prefer_ask] - - @cache_ttl 600 # Ten minutes - @clean_cache_every 60 # One minute - - defmodule State do - defstruct [:shard_id, :path, :store, :reqs, :retries] - end - - - def start_link(shard_id, path) do - GenServer.start_link(__MODULE__, [shard_id, path]) - end - - def init([shard_id, path]) do - Shard.Manager.dispatch_to(shard_id, path, self()) - - store_path = String.to_atom "#{Base.encode16 shard_id}/#{Atom.to_string path}" - store = :ets.new store_path, [:set, :protected] - - Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) - - {:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}} - end - - - def handle_call({:get, key, prefer_ask}, from, state) do - case :ets.lookup state.store, key do - [{_, _, v}] -> - {:reply, v, state} - [{_, _}] -> - state = add_request(state, key, from) - {:noreply, state} - [] -> - why = {:cached, System.os_time(:seconds) + @cache_ttl} - init_rec_pull(state, key, why, prefer_ask) - state = add_request(state, key, from) - {:noreply, state} - end - end - - def handle_call({:put, val}, _from, state) do - hash = SData.term_hash val - store_put(state, hash, val) - {:reply, hash, state} - end - - defp add_request(state, key, from) do - reqs_key = case state.reqs[key] do - nil -> - MapSet.put(MapSet.new(), from) - ms -> - MapSet.put(ms, from) - end - put_in(state.reqs[key], reqs_key) - end - - defp store_put(state, hash, val) do - case :ets.lookup state.store, hash do - [] -> - :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, val} - nil - [{_, why}] -> - :ets.insert state.store, {hash, why, val} - why - [{_, _, _}] -> - nil - end - end - - defp init_rec_pull(state, key, why, prefer_ask) do - case prefer_ask do - [_ | _] -> - for peer <- prefer_ask do - Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) - end - _ -> - ask_random_peers(state, key) - end - :ets.insert state.store, {key, why} - end - - def handle_cast({:rec_pull, hash, ask_to}, state) do - if :ets.lookup state.store, hash == [] do - why = {:cached, System.os_time(:seconds) + @cache_ttl} - init_rec_pull(state, hash, why, ask_to) - end - {:noreply, state} - end - - def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do - state = case msg do - {:get, key} -> - case :ets.lookup state.store, key do - [{_, _, v}] -> - Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}}) - _ -> - Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) - end - state - {:info, hash, value} -> - if SData.term_hash value == hash do - reqs = case state.reqs[hash] do - nil -> state.reqs - pids -> - for pid <- pids do - GenServer.reply(pid, value) - end - Map.delete(state.reqs, hash) - end - state = %{state | retries: Map.delete(state.retries, hash)} - rec_why = store_put state, hash, value - if rec_why != nil do - sub_why = case rec_why do - {:cached, ttl} -> {:cached, ttl} - _ -> {:req_by, hash} - end - for dep <- SData.Page.refs value do - init_rec_pull(state, dep, sub_why, [peer_id]) - end - end - %{state | reqs: reqs} - else - state - end - {:not_found, key} -> - if state.reqs[key] != nil and :ets.lookup state.store, key == [] do - nretry = case state.retries[key] do - nil -> 1 - n -> n+1 - end - if nretry < 3 do - ask_random_peers(state, key) - %{state | retries: Map.put(state.retries, key, nretry)} - else - for pid <- state.reqs[key] do - GenServer.reply(pid, nil) - end - state = %{state | reqs: Map.delete(state.reqs, key)} - state = %{state | retries: Map.delete(state.retries, key)} - state - end - else - state - end - end - {:noreply, state} - end - - def handle_cast({:set_roots, roots}, state) do - cached_why = {:cached, System.os_time(:seconds) + @cache_ttl} - - # Set old roots as cached - for [id, val] <- :ets.match state.store, {:"$1", :root, :"$2"} do - :ets.insert state.store, {id, cached_why, val} - end - for [id] <- :ets.match state.store, {:"$1", :root} do - :ets.insert state.store, {id, cached_why} - end - - # Set old deps as cached - for [id, val] <- :ets.match state.store, {:"$1", {:req_by, :_}, :"$2"} do - :ets.insert state.store, {id, cached_why, val} - end - for [id] <- :ets.match state.store, {:"$1", {:req_by, :_}} do - :ets.insert state.store, {id, cached_why} - end - - # Set new roots as roots - for root <- roots do - case :ets.lookup state.store, root do - [{^root, _, val}] -> - :ets.insert state.store, {root, :root, val} - rec_set_dep state.store, root, val - [{^root, _}] -> - :ets.insert state.store, {root, :root} - [] -> - init_rec_pull state, root, :root, [] - end - end - {:noreply, state} - end - - defp rec_set_dep(store, hash, val0) do - for dep <- SData.Page.refs val0 do - case :ets.lookup store, dep do - [{^dep, _, val}] -> - :ets.insert store, {dep, {:req_by, hash}, val} - rec_set_dep(store, dep, val) - _ -> - :ets.insert store, {dep, {:req_by, hash}} - end - end - end - - def handle_info(:clean_cache, state) do - currtime = System.os_time :seconds - - cache_cleanup = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [true]}, - {{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [true]} ] - :ets.select_delete(state.store, cache_cleanup) - - Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) - {:noreply, state} - end - - def ask_random_peers(state, key) do - peers = Shard.Manager.get_shard_peers(state.shard_id) - |> Enum.shuffle - |> Enum.take(3) - for peer <- peers do - Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) - end - end - - defimpl SData.PageStore do - def put(store, page) do - hash = GenServer.call(store.pid, {:put, page}) - { hash, store } - end - - def get(store, hash) do - try do - GenServer.call(store.pid, {:get, hash, store.prefer_ask}) - catch - :exit, {:timeout, _} -> nil - end - end - - def copy(store, other_store, hash) do - GenServer.cast(store.pid, {:rec_pull, hash, other_store.prefer_ask}) - store - end - - def free(store, _hash) do - store ## DO SOMETHING??? - end - end -end diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 4752dc6..fa62c9e 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -50,15 +50,15 @@ defmodule SApp.Chat do case Shard.Manager.register(id, manifest, self()) do :ok -> Shard.Manager.dispatch_to(id, nil, self()) - {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store) - mst = %MST{store: %SApp.BlockStore{pid: block_store}, + {:ok, page_store} = SApp.PageStore.start_link(id, :page_store) + mst = %MST{store: %SApp.PageStore{pid: page_store}, cmp: &msg_cmp/2} GenServer.cast(self(), :init_pull) {:ok, %{channel: channel, id: id, manifest: manifest, - block_store: block_store, + page_store: page_store, mst: mst, subs: MapSet.new, } @@ -160,7 +160,7 @@ defmodule SApp.Chat do if mst2.root == new_root do # This was the only message missing, we are happy! state = %{state | mst: mst2} - GenServer.cast(state.block_store, {:set_roots, [mst2.root]}) + GenServer.cast(state.page_store, {:set_roots, [mst2.root]}) msg_callback(state, msgitem) state else @@ -199,7 +199,7 @@ defmodule SApp.Chat do end end - GenServer.cast(state.block_store, {:set_roots, [mst.root]}) + GenServer.cast(state.page_store, {:set_roots, [mst.root]}) %{state | mst: mst} end diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex new file mode 100644 index 0000000..5f2ba54 --- /dev/null +++ b/shard/lib/app/pagestore.ex @@ -0,0 +1,269 @@ +defmodule SApp.PageStore do + @moduledoc """ + A module that implements a content-adressable storage (pages, + identified by the hash of their contents). + + This is not a shard, it is a side process that a shard may use to store its data. + + Uses an ETS table of: + + { page_id, why_have_it } -- waiting for data + { page_id, why_have_it, data } -- once we have the data + + why_have_it := :root + | {:req_by, some_other_page_id} + | {:cached, expiry_date} + """ + + use GenServer + + @enforce_keys [:pid] + defstruct [:pid, :prefer_ask] + + @cache_ttl 600 # Ten minutes + @clean_cache_every 60 # One minute + @max_failures 4 # Maximum of peers that reply not_found before we abandon + + defmodule State do + defstruct [:shard_id, :path, :store, :reqs, :retries] + end + + + def start_link(shard_id, path) do + GenServer.start_link(__MODULE__, [shard_id, path]) + end + + def init([shard_id, path]) do + Shard.Manager.dispatch_to(shard_id, path, self()) + + store_path = String.to_atom "#{Base.encode16 shard_id}/#{Atom.to_string path}" + store = :ets.new store_path, [:set, :protected] + + Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) + + {:ok, %State{shard_id: shard_id, path: path, store: store, reqs: %{}, retries: %{}}} + end + + + def handle_call({:get, key, prefer_ask}, from, state) do + case :ets.lookup state.store, key do + [{_, _, bin}] -> + {:reply, bin, state} + [{_, _}] -> + state = add_request(state, key, from) + {:noreply, state} + [] -> + why = {:cached, System.os_time(:seconds) + @cache_ttl} + init_rec_pull(state, key, why, prefer_ask) + state = add_request(state, key, from) + {:noreply, state} + end + end + + def handle_call({:put, bin}, _from, state) do + hash = SData.bin_hash bin + store_put(state, hash, bin) + {:reply, hash, state} + end + + defp add_request(state, key, from) do + reqs_key = case state.reqs[key] do + nil -> + MapSet.put(MapSet.new(), from) + ms -> + MapSet.put(ms, from) + end + put_in(state.reqs[key], reqs_key) + end + + defp store_put(state, hash, bin) do + case :ets.lookup state.store, hash do + [] -> + :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin} + nil + [{_, why}] -> + :ets.insert state.store, {hash, why, bin} + why + [{_, _, _}] -> + nil + end + end + + defp init_rec_pull(state, key, why, prefer_ask) do + case prefer_ask do + [] -> + ask_random_peers(state, key) + _ -> + for peer <- prefer_ask do + Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) + end + end + :ets.insert state.store, {key, why} + end + + def handle_cast({:rec_pull, hash, ask_to}, state) do + if :ets.lookup state.store, hash == [] do + why = {:cached, System.os_time(:seconds) + @cache_ttl} + init_rec_pull(state, hash, why, ask_to) + end + {:noreply, state} + end + + def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do + state = case msg do + {:get, key} -> + case :ets.lookup state.store, key do + [{_, _, bin}] -> + Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, bin}}) + _ -> + Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) + end + state + {:info, hash, bin} -> + already_have_it = case :ets.lookup state.store, hash do + [{_, _, _}] -> true + _ -> false + end + if SData.bin_hash bin == hash and not already_have_it do + reqs = case state.reqs[hash] do + nil -> state.reqs + pids -> + for pid <- pids do + GenServer.reply(pid, bin) + end + Map.delete(state.reqs, hash) + end + state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)} + rec_why = store_put(state, hash, bin) + if rec_why != nil do + sub_why = case rec_why do + {:cached, ttl} -> {:cached, ttl} + _ -> {:req_by, hash} + end + value = SData.term_unbin bin + for dep <- SData.Page.refs value do + if :ets.lookup state.store, dep == [] do + init_rec_pull(state, dep, sub_why, [peer_id]) + end + end + end + state + else + state + end + {:not_found, key} -> + if state.reqs[key] != nil do + nretry = case state.retries[key] do + nil -> 1 + n -> n+1 + end + if nretry < @max_failures do + ask_random_peers(state, key) + %{state | retries: Map.put(state.retries, key, nretry)} + else + for pid <- state.reqs[key] do + GenServer.reply(pid, nil) + end + state = %{state | reqs: Map.delete(state.reqs, key)} + state = %{state | retries: Map.delete(state.retries, key)} + state + end + else + state + end + end + {:noreply, state} + end + + def handle_cast({:set_roots, roots}, state) do + cached_why = {:cached, System.os_time(:seconds) + @cache_ttl} + + # Set old roots and their deps as cached + for ent <- :ets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] }, + { {:"$1", :root}, [], [:"$$"] }, + { {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] }, + { {:"$1", {:req_by, :_}}, [], [:"$$"] }] + do + case ent do + [id, bin] -> + :ets.insert state.store, {id, cached_why, bin} + [id] -> + :ets.insert state.store, {id, cached_why} + end + end + + # Set new roots as roots + for root <- roots do + case :ets.lookup state.store, root do + [{^root, _, bin}] -> + :ets.insert state.store, {root, :root, bin} + rec_set_dep(state, root, SData.term_unbin bin) + [{^root, _}] -> + :ets.insert state.store, {root, :root} + [] -> + init_rec_pull state, root, :root, [] + end + end + {:noreply, state} + end + + defp rec_set_dep(state, hash, val0) do + for dep <- SData.Page.refs val0 do + case :ets.lookup state.store, dep do + [{^dep, _, bin}] -> + :ets.insert state.store, {dep, {:req_by, hash}, bin} + rec_set_dep(state, dep, SData.term_unbin bin) + [{^dep, _}] -> + :ets.insert state.store, {dep, {:req_by, hash}} + [] -> + init_rec_pull state, dep, {:req_by, hash}, [] + end + end + end + + def handle_info(:clean_cache, state) do + currtime = System.os_time :seconds + + cache_cleanup = [ {{:_, {:cached, :'$1'}, :_}, [{:<, :'$1', currtime}], [true]}, + {{:_, {:cached, :'$1'}}, [{:<, :'$1', currtime}], [true]} ] + :ets.select_delete(state.store, cache_cleanup) + + Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) + {:noreply, state} + end + + def ask_random_peers(state, key) do + peers = Shard.Manager.get_shard_peers(state.shard_id) + |> Enum.shuffle + |> Enum.take(3) + for peer <- peers do + Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) + end + end + + defimpl SData.PageStore do + def put(store, page) do + bin = SData.term_bin page + hash = GenServer.call(store.pid, {:put, bin}) + { hash, store } + end + + def get(store, hash) do + try do + bin = GenServer.call(store.pid, {:get, hash, store.prefer_ask}) + SData.term_unbin bin + catch + :exit, {:timeout, _} -> nil + end + end + + def copy(store, other_store, hash) do + GenServer.cast(store.pid, {:rec_pull, hash, other_store.prefer_ask}) + store + end + + def free(store, _hash) do + store ## DO SOMETHING??? + end + end +end diff --git a/shard/lib/data/data.ex b/shard/lib/data/data.ex index c2c659d..78c73cd 100644 --- a/shard/lib/data/data.ex +++ b/shard/lib/data/data.ex @@ -18,6 +18,18 @@ defmodule SData do :crypto.hash(algo, (:erlang.term_to_binary term)) end + def term_bin(term) do + :erlang.term_to_binary term + end + + def bin_hash(bin, algo \\ :sha256) do + :crypto.hash(algo, bin) + end + + def term_unbin(bin) do + :erlang.binary_to_term(bin, [:safe]) + end + @doc""" Compare function for arbitrary terms using the Erlang order """ -- cgit v1.2.3