diff options
Diffstat (limited to 'shard')
-rw-r--r-- | shard/lib/app/chat.ex | 10 | ||||
-rw-r--r-- | shard/lib/app/pagestore.ex (renamed from shard/lib/app/blockstore.ex) | 116 | ||||
-rw-r--r-- | shard/lib/data/data.ex | 12 |
3 files changed, 80 insertions, 58 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 4752dc6..fa62c9e 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -50,15 +50,15 @@ defmodule SApp.Chat do case Shard.Manager.register(id, manifest, self()) do :ok -> Shard.Manager.dispatch_to(id, nil, self()) - {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store) - mst = %MST{store: %SApp.BlockStore{pid: block_store}, + {:ok, page_store} = SApp.PageStore.start_link(id, :page_store) + mst = %MST{store: %SApp.PageStore{pid: page_store}, cmp: &msg_cmp/2} GenServer.cast(self(), :init_pull) {:ok, %{channel: channel, id: id, manifest: manifest, - block_store: block_store, + page_store: page_store, mst: mst, subs: MapSet.new, } @@ -160,7 +160,7 @@ defmodule SApp.Chat do if mst2.root == new_root do # This was the only message missing, we are happy! state = %{state | mst: mst2} - GenServer.cast(state.block_store, {:set_roots, [mst2.root]}) + GenServer.cast(state.page_store, {:set_roots, [mst2.root]}) msg_callback(state, msgitem) state else @@ -199,7 +199,7 @@ defmodule SApp.Chat do end end - GenServer.cast(state.block_store, {:set_roots, [mst.root]}) + GenServer.cast(state.page_store, {:set_roots, [mst.root]}) %{state | mst: mst} end diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/pagestore.ex index 077235a..5f2ba54 100644 --- a/shard/lib/app/blockstore.ex +++ b/shard/lib/app/pagestore.ex @@ -1,17 +1,17 @@ -defmodule SApp.BlockStore do +defmodule SApp.PageStore do @moduledoc """ - A module that implements a content-adressable storage (blocks, or pages, + A module that implements a content-adressable storage (pages, identified by the hash of their contents). This is not a shard, it is a side process that a shard may use to store its data. Uses an ETS table of: - { block_id, why_have_it } -- waiting for data - { block_id, why_have_it, data } -- once we have the data + { page_id, why_have_it } -- waiting for data + { page_id, why_have_it, data } -- once we have the data why_have_it := :root - | {:req_by, some_other_block_id} + | {:req_by, some_other_page_id} | {:cached, expiry_date} """ @@ -22,6 +22,7 @@ defmodule SApp.BlockStore do @cache_ttl 600 # Ten minutes @clean_cache_every 60 # One minute + @max_failures 4 # Maximum of peers that reply not_found before we abandon defmodule State do defstruct [:shard_id, :path, :store, :reqs, :retries] @@ -46,8 +47,8 @@ defmodule SApp.BlockStore do def handle_call({:get, key, prefer_ask}, from, state) do case :ets.lookup state.store, key do - [{_, _, v}] -> - {:reply, v, state} + [{_, _, bin}] -> + {:reply, bin, state} [{_, _}] -> state = add_request(state, key, from) {:noreply, state} @@ -59,9 +60,9 @@ defmodule SApp.BlockStore do end end - def handle_call({:put, val}, _from, state) do - hash = SData.term_hash val - store_put(state, hash, val) + def handle_call({:put, bin}, _from, state) do + hash = SData.bin_hash bin + store_put(state, hash, bin) {:reply, hash, state} end @@ -75,13 +76,13 @@ defmodule SApp.BlockStore do put_in(state.reqs[key], reqs_key) end - defp store_put(state, hash, val) do + defp store_put(state, hash, bin) do case :ets.lookup state.store, hash do [] -> - :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, val} + :ets.insert state.store, {hash, {:cached, System.os_time(:seconds) + @cache_ttl}, bin} nil [{_, why}] -> - :ets.insert state.store, {hash, why, val} + :ets.insert state.store, {hash, why, bin} why [{_, _, _}] -> nil @@ -90,12 +91,12 @@ defmodule SApp.BlockStore do defp init_rec_pull(state, key, why, prefer_ask) do case prefer_ask do - [_ | _] -> + [] -> + ask_random_peers(state, key) + _ -> for peer <- prefer_ask do Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) end - _ -> - ask_random_peers(state, key) end :ets.insert state.store, {key, why} end @@ -112,44 +113,51 @@ defmodule SApp.BlockStore do state = case msg do {:get, key} -> case :ets.lookup state.store, key do - [{_, _, v}] -> - Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}}) + [{_, _, bin}] -> + Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, bin}}) _ -> Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) end state - {:info, hash, value} -> - if SData.term_hash value == hash do + {:info, hash, bin} -> + already_have_it = case :ets.lookup state.store, hash do + [{_, _, _}] -> true + _ -> false + end + if SData.bin_hash bin == hash and not already_have_it do reqs = case state.reqs[hash] do nil -> state.reqs pids -> for pid <- pids do - GenServer.reply(pid, value) + GenServer.reply(pid, bin) end Map.delete(state.reqs, hash) end - state = %{state | retries: Map.delete(state.retries, hash)} - rec_why = store_put state, hash, value + state = %{state | reqs: reqs, retries: Map.delete(state.retries, hash)} + rec_why = store_put(state, hash, bin) if rec_why != nil do sub_why = case rec_why do {:cached, ttl} -> {:cached, ttl} _ -> {:req_by, hash} end + value = SData.term_unbin bin for dep <- SData.Page.refs value do - init_rec_pull(state, dep, sub_why, [peer_id]) + if :ets.lookup state.store, dep == [] do + init_rec_pull(state, dep, sub_why, [peer_id]) + end end end - %{state | reqs: reqs} + state else state end {:not_found, key} -> - if state.reqs[key] != nil and :ets.lookup state.store, key == [] do + if state.reqs[key] != nil do nretry = case state.retries[key] do nil -> 1 n -> n+1 end - if nretry < 3 do + if nretry < @max_failures do ask_random_peers(state, key) %{state | retries: Map.put(state.retries, key, nretry)} else @@ -170,28 +178,26 @@ defmodule SApp.BlockStore do def handle_cast({:set_roots, roots}, state) do cached_why = {:cached, System.os_time(:seconds) + @cache_ttl} - # Set old roots as cached - for [id, val] <- :ets.match state.store, {:"$1", :root, :"$2"} do - :ets.insert state.store, {id, cached_why, val} - end - for [id] <- :ets.match state.store, {:"$1", :root} do - :ets.insert state.store, {id, cached_why} - end - - # Set old deps as cached - for [id, val] <- :ets.match state.store, {:"$1", {:req_by, :_}, :"$2"} do - :ets.insert state.store, {id, cached_why, val} - end - for [id] <- :ets.match state.store, {:"$1", {:req_by, :_}} do - :ets.insert state.store, {id, cached_why} + # Set old roots and their deps as cached + for ent <- :ets.select state.store, [{ {:"$1", :root, :"$2"}, [], [:"$$"] }, + { {:"$1", :root}, [], [:"$$"] }, + { {:"$1", {:req_by, :_}, :"$2"}, [], [:"$$"] }, + { {:"$1", {:req_by, :_}}, [], [:"$$"] }] + do + case ent do + [id, bin] -> + :ets.insert state.store, {id, cached_why, bin} + [id] -> + :ets.insert state.store, {id, cached_why} + end end # Set new roots as roots for root <- roots do case :ets.lookup state.store, root do - [{^root, _, val}] -> - :ets.insert state.store, {root, :root, val} - rec_set_dep state.store, root, val + [{^root, _, bin}] -> + :ets.insert state.store, {root, :root, bin} + rec_set_dep(state, root, SData.term_unbin bin) [{^root, _}] -> :ets.insert state.store, {root, :root} [] -> @@ -201,14 +207,16 @@ defmodule SApp.BlockStore do {:noreply, state} end - defp rec_set_dep(store, hash, val0) do + defp rec_set_dep(state, hash, val0) do for dep <- SData.Page.refs val0 do - case :ets.lookup store, dep do - [{^dep, _, val}] -> - :ets.insert store, {dep, {:req_by, hash}, val} - rec_set_dep(store, dep, val) - _ -> - :ets.insert store, {dep, {:req_by, hash}} + case :ets.lookup state.store, dep do + [{^dep, _, bin}] -> + :ets.insert state.store, {dep, {:req_by, hash}, bin} + rec_set_dep(state, dep, SData.term_unbin bin) + [{^dep, _}] -> + :ets.insert state.store, {dep, {:req_by, hash}} + [] -> + init_rec_pull state, dep, {:req_by, hash}, [] end end end @@ -235,13 +243,15 @@ defmodule SApp.BlockStore do defimpl SData.PageStore do def put(store, page) do - hash = GenServer.call(store.pid, {:put, page}) + bin = SData.term_bin page + hash = GenServer.call(store.pid, {:put, bin}) { hash, store } end def get(store, hash) do try do - GenServer.call(store.pid, {:get, hash, store.prefer_ask}) + bin = GenServer.call(store.pid, {:get, hash, store.prefer_ask}) + SData.term_unbin bin catch :exit, {:timeout, _} -> nil end diff --git a/shard/lib/data/data.ex b/shard/lib/data/data.ex index c2c659d..78c73cd 100644 --- a/shard/lib/data/data.ex +++ b/shard/lib/data/data.ex @@ -18,6 +18,18 @@ defmodule SData do :crypto.hash(algo, (:erlang.term_to_binary term)) end + def term_bin(term) do + :erlang.term_to_binary term + end + + def bin_hash(bin, algo \\ :sha256) do + :crypto.hash(algo, bin) + end + + def term_unbin(bin) do + :erlang.binary_to_term(bin, [:safe]) + end + @doc""" Compare function for arbitrary terms using the Erlang order """ |