diff options
-rw-r--r-- | lib/app/blockstore.ex | 145 | ||||
-rw-r--r-- | lib/app/blockstore.ex_ | 78 | ||||
-rw-r--r-- | lib/app/chat.ex | 86 | ||||
-rw-r--r-- | lib/cli/cli.ex | 11 | ||||
-rw-r--r-- | lib/data/merklesearchtree.ex | 109 | ||||
-rw-r--r-- | test/mst_test.exs | 20 |
6 files changed, 318 insertions, 131 deletions
diff --git a/lib/app/blockstore.ex b/lib/app/blockstore.ex new file mode 100644 index 0000000..1523a44 --- /dev/null +++ b/lib/app/blockstore.ex @@ -0,0 +1,145 @@ +defmodule SApp.BlockStore do + @moduledoc """ + A module that implements a content-adressable storage (blocks, or pages, + identified by the hash of their contents). + + This is not a shard, it is a side process that a shard may use to store its data. + + TODO: WIP + """ + + use GenServer + + @enforce_keys [:pid] + defstruct [:pid, :prefer_ask] + + + defmodule State do + defstruct [:shard_id, :path, :store, :reqs, :retries] + end + + + def start_link(shard_id, path) do + GenServer.start_link(__MODULE__, [shard_id, path]) + end + + def init([shard_id, path]) do + Shard.Manager.dispatch_to(shard_id, path, self()) + + {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}} + end + + def handle_call({:get, key, prefer_ask}, from, state) do + case state.store[key] do + nil -> + case prefer_ask do + [_ | _] -> + for peer <- prefer_ask do + Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) + end + _ -> + ask_random_peers(state, key) + end + reqs_key = case state.reqs[key] do + nil -> + MapSet.put(MapSet.new(), from) + ms -> + MapSet.put(ms, from) + end + state = put_in(state.reqs[key], reqs_key) + {:noreply, state} + v -> + {:reply, v, state} + end + end + + def handle_call({:put, val}, _from, state) do + hash = SData.term_hash val + state = %{state | store: Map.put(state.store, hash, val)} + {:reply, hash, state} + end + + def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do + state = case msg do + {:get, key} -> + case state.store[key] do + nil -> + Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) + v -> + Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}}) + end + state + {:info, hash, value} -> + if SData.term_hash value == hash do + reqs = case state.reqs[hash] do + nil -> state.reqs + pids -> + for pid <- pids do + GenServer.reply(pid, value) + end + Map.delete(state.reqs, hash) + end + state = %{state | retries: Map.delete(state.retries, hash)} + %{state | store: Map.put(state.store, hash, value), reqs: reqs} + else + state + end + {:not_found, key} -> + if state.reqs[key] != nil and state.store[key] == nil do + nretry = case state.retries[key] do + nil -> 1 + n -> n+1 + end + if nretry < 3 do + ask_random_peers(state, key) + %{state | retries: Map.put(state.retries, key, nretry)} + else + for pid <- state.reqs[key] do + GenServer.reply(pid, nil) + end + state = %{state | reqs: Map.delete(state.reqs, key)} + state = %{state | retries: Map.delete(state.retries, key)} + state + end + else + state + end + end + {:noreply, state} + end + + def ask_random_peers(state, key) do + peers = :ets.lookup(:shard_peer_db, state.shard_id) + |> Enum.shuffle + |> Enum.take(3) + for peer <- peers do + Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) + end + end + + + defimpl SData.PageStore do + def put(store, page) do + hash = GenServer.call(store.pid, {:put, page}) + { hash, store } + end + + def get(store, hash) do + GenServer.call(store.pid, {:get, hash, store.prefer_ask}) + end + + def copy(store, other_store, hash) do + page = SData.PageStore.get(other_store, hash) + refs = SData.Page.refs(page) + for ref <- refs do + copy(store, other_store, ref) + end + GenServer.call(store.pid, {:put, page}) + store + end + + def free(store, _hash) do + store ## DO SOMETHING??? + end + end +end diff --git a/lib/app/blockstore.ex_ b/lib/app/blockstore.ex_ deleted file mode 100644 index 2854161..0000000 --- a/lib/app/blockstore.ex_ +++ /dev/null @@ -1,78 +0,0 @@ -defmodule SApp.BlockStore do - @moduledoc """ - A module that implements a content-adressable storage (blocks, or pages, - identified by the hash of their contents). - - Establishes full node connectivity and uses rendez-vous hashing to select - which nodes are responsible of a given hash. - - TODO: WIP - """ - - use GenServer - - defmodule State do - defstruct [:name, :id, :manifest, - :ncopies, - :store, :peers] - end - - - def start_link(name) do - GenServer.start_link(__MODULE__, name) - end - - def init(name) do - manifest = {:blockstore, name} - id = SData.term_hash manifest - - GenServer.cast(Shard.Manager, {:register, id, self()}) - GenServer.cast(self(), :init_pull) - - {:ok, %State{name: name, id: id, manifest: manifest, - ncopies: 3, - store: %{}, peers: %{}}} - end - - def handle_call(:manifest, _from, state) do - {:reply, state.manifest, state} - end - - def handle_call({:get, key}, from, state) do - # TODO - end - - def handle_call({:put, val}, state) do - # TODO - end - - def handle_cast({:redundant, _}, _state) do - exit :normal - end - - def handle_cast(:init_pull, state) do - GenServer.call(SNet.Manager, :get_all) - |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}}))) - {:noreply, state} - end - - def handle_cast({:interested, peer_id, peer_pid}, state) do - new_peers = Map.put(state.peers, peer_id, peer_pid) - new_state = %{ state | peers: new_peers } - initial_sync(new_state, peer_id, peer_pid) - {:noreply, new_state} - end - - def handle_cast({:msg, peer_id, peer_pid, msg}, state) do - # TODO - {:noreply, state} - end - - defp initial_sync(state, peer_id, peer_pid) do - # TODO - end - - defp send(state, to, msg) do - GenServer.cast(to, {:send_msg, {state.id, msg}}) - end -end diff --git a/lib/app/chat.ex b/lib/app/chat.ex index e28e896..051fab6 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -19,7 +19,8 @@ defmodule SApp.Chat do use GenServer - alias SData.MerkleList, as: ML + require Logger + alias SData.MerkleSearchTree, as: MST @doc """ Start a process that connects to a given channel @@ -32,19 +33,22 @@ defmodule SApp.Chat do Initialize channel process. """ def init(channel) do - store = ML.new(&msg_cmp/2) manifest = {:chat, channel} id = SData.term_hash manifest case Shard.Manager.register(id, manifest, self()) do :ok -> Shard.Manager.dispatch_to(id, nil, self()) + {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store) + mst = %MST{store: %SApp.BlockStore{pid: block_store}, + cmp: &msg_cmp/2} GenServer.cast(self(), :init_pull) {:ok, %{channel: channel, id: id, manifest: manifest, - store: store, + block_store: block_store, + mst: mst, subs: MapSet.new, } } @@ -60,8 +64,8 @@ defmodule SApp.Chat do {:reply, state.manifest, state} end - def handle_call({:read_history, start, num}, _from, state) do - ret = ML.read(state.store, start, num) + def handle_call({:read_history, top_bound, num}, _from, state) do + ret = MST.last(state.mst, top_bound, num) {:reply, ret, state} end @@ -86,7 +90,9 @@ defmodule SApp.Chat do msgitem = {(System.os_time :seconds), Shard.Identity.get_nickname(), msg} - new_state = %{state | store: ML.insert(state.store, msgitem)} + prev_root = state.mst.root + mst = MST.insert(state.mst, msgitem) + state = %{state | mst: mst} for pid <- state.subs do if Process.alive?(pid) do @@ -94,11 +100,12 @@ defmodule SApp.Chat do end end + notif = {:append, prev_root, msgitem, mst.root} for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do - push_messages(new_state, peer_id, nil, 5) + Shard.Manager.send(peer_id, {state.id, nil, notif}) end - {:noreply, new_state} + {:noreply, state} end @doc """ @@ -106,7 +113,7 @@ defmodule SApp.Chat do connected to asks to recieve data for this channel. """ def handle_cast({:interested, peer_id}, state) do - push_messages(state, peer_id, nil, 10) + Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}}) {:noreply, state} end @@ -126,27 +133,52 @@ defmodule SApp.Chat do Merkle hash of the store of older messages. """ def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do - case msg do + state = case msg do {:get_manifest} -> Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}}) - {:get, start} -> push_messages(state, peer_id, start, 20) - {:info, _start, list, rest} -> - if rest != nil and not ML.has(state.store, rest) do - Shard.Manager.send(peer_id, {state.id, nil, {:get, rest}}) + state + {:append, prev_root, msgitem, new_root} -> + # Append message: one single mesage has arrived + if new_root == state.mst.root do + # We already have the message, do nothing + state + else + # Try adding the message + if prev_root == state.mst.root do + mst2 = MST.insert(state.mst, msgitem) + if mst2.root == new_root do + # This was the only message missing, we are happy! + state = %{state | mst: mst2} + msg_callback(state, msgitem) + state + else + # More messages are missing, start a full merge + init_merge(state, new_root, peer_id) + end + else + init_merge(state, new_root, peer_id) + end + end + {:root, new_root} -> + if new_root == state.mst.root do + # already up to date, ignore + state + else + init_merge(state, new_root, peer_id) end - who = self() - spawn_link(fn -> - Process.sleep 1000 - GenServer.cast(who, {:deferred_insert, list}) - end) - _ -> nil + x -> + Logger.info("Unhandled message: #{inspect x}") + state end {:noreply, state} end - def handle_cast({:deferred_insert, list}, state) do - new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state, msg) end)) - {:noreply, %{state | store: new_store}} + defp init_merge(state, new_root, source_peer) do + # TODO: make the merge asynchronous + mgmst = %{state.mst | root: new_root} + mgmst = put_in(mgmst.store.prefer_ask, [source_peer]) + mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end) + %{state | mst: mst} end def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do @@ -154,14 +186,6 @@ defmodule SApp.Chat do {:noreply, %{ state | subs: new_subs }} end - defp push_messages(state, to, start, num) do - case ML.read(state.store, start, num) do - {:ok, list, rest} -> - Shard.Manager.send(to, {state.id, nil, {:info, start, list, rest}}) - _ -> nil - end - end - defp msg_callback(state, {ts, nick, msg}) do for pid <- state.subs do if Process.alive?(pid) do diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex index 8928040..2fbf8c2 100644 --- a/lib/cli/cli.ex +++ b/lib/cli/cli.ex @@ -64,15 +64,10 @@ defmodule SCLI do end defp handle_command(pid, ["hist"]) do - case GenServer.call(pid, {:read_history, nil, 100}) do - {:ok, list, _rest} -> - list - |> Enum.reverse - |> Enum.each(fn {ts, nick, msg} -> - IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" + GenServer.call(pid, {:read_history, nil, 25}) + |> Enum.each(fn {{ts, nick, msg}, true} -> + IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}" end) - _ -> nil - end pid end diff --git a/lib/data/merklesearchtree.ex b/lib/data/merklesearchtree.ex index 0554f82..b751a08 100644 --- a/lib/data/merklesearchtree.ex +++ b/lib/data/merklesearchtree.ex @@ -48,17 +48,15 @@ defmodule SData.MerkleSearchTree do end - + @doc""" + Insert an item into the search tree. + """ def insert(state, key, value \\ true) do level = calc_level(key) {hash, store} = insert_at(state, state.store, state.root, key, level, value) %{ state | root: hash, store: store } end - def remove(_state, _key) do - # TODO - end - defp insert_at(s, store, root, key, level, value) do {new_page, store} = if root == nil do { %Page{ level: level, low: nil, list: [ { key, value, nil } ] }, store } @@ -190,34 +188,118 @@ defmodule SData.MerkleSearchTree do end end - def get(state, key) do - get(state, state.store, state.root, key) + @doc""" + Merge values from another MST in this MST + """ + def merge(to, from, callback \\ fn _, _ -> nil end) do + { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback) + %{ to | store: store, root: root } + end + + defp merge_aux(s1, s2, store, r1, r2, callback) do + case {r1, r2} do + {_, nil} -> + { store, r1 } + {nil, _} -> + store = Store.copy(store, s2.store, r2) + rec_callback(store, r2, callback) + { store, r2 } + _ -> + IO.puts("not implemented: complex merge step") + #TODO + { store, r1 } + end end - defp get(s, store, root, key) do + defp rec_callback(store, root, callback) do case root do nil -> nil _ -> %Page{ level: _, low: low, list: lst } = Store.get(store, root) - get_aux(s, store, low, lst, key) + rec_callback(store, low, callback) + for {k, v, rst} <- lst do + callback.(k, v) + rec_callback(store, rst, callback) + end + end + end + + @doc""" + Get value for a specific key in search tree. + """ + def get(state, key) do + get(state, state.root, key) + end + + defp get(s, root, key) do + case root do + nil -> nil + _ -> + %Page{ level: _, low: low, list: lst } = Store.get(s.store, root) + get_aux(s, low, lst, key) end end - defp get_aux(s, store, low, lst, key) do + defp get_aux(s, low, lst, key) do case lst do [] -> - get(s, store, low, key) + get(s, low, key) [ {k, v, low2} | rst ] -> case s.cmp.(key, k) do :duplicate -> v :before -> - get(s, store, low, key) + get(s, low, key) :after -> - get_aux(s, store, low2, rst, key) + get_aux(s, low2, rst, key) end end end + @doc""" + Get the last n items of the tree, or the last n items + strictly before given upper bound if non nil + """ + def last(state, top_bound, num) do + last(state, state.root, top_bound, num) + end + + defp last(s, root, top_bound, num) do + case root do + nil -> [] + _ -> + %Page{ level: _, low: low, list: lst } = Store.get(s.store, root) + last_aux(s, low, lst, top_bound, num) + end + end + + defp last_aux(s, low, lst, top_bound, num) do + case lst do + [] -> + last(s, low, top_bound, num) + [ {k, v, low2} | rst ] -> + if top_bound == nil or s.cmp.(top_bound, k) == :after do + items = last_aux(s, low2, rst, top_bound, num) + items = if Enum.count(items) < num do + [ {k, v} | items ] + else + items + end + cnt = Enum.count items + if cnt < num do + last(s, low, top_bound, num - cnt) ++ items + else + items + end + else + last(s, low, top_bound, num) + end + end + end + + + @doc""" + Dump Merkle search tree structure. + """ def dump(state) do dump(state.store, state.root, "") end @@ -251,5 +333,4 @@ defmodule SData.MerkleSearchTree do defp count_leading_zeroes(_) do 0 end - end diff --git a/test/mst_test.exs b/test/mst_test.exs index 73b4f63..7fe340e 100644 --- a/test/mst_test.exs +++ b/test/mst_test.exs @@ -107,5 +107,25 @@ defmodule ShardTest.MST do IO.puts "y.root: #{y.root|>Base.encode16}" IO.puts "z.root: #{z.root|>Base.encode16}" assert y.root == z.root + + MST.last(y, nil, 10) + end + + test "merkle search tree 5" do + y = Enum.reduce(0..1000, %MST{}, + fn i, acc -> MST.insert(acc, i) end) + + assert(MST.last(y, nil, 2) == [{999, true}, {1000, true}]) + assert(MST.last(y, 42, 2) == [{40, true}, {41, true}]) + + stuff = for i <- 100..199, do: {i, true} + assert MST.last(y, 200, 100) == stuff + + stuff = for i <- 200..299, do: {i, true} + assert MST.last(y, 300, 100) == stuff + + stuff = for i <- 200..499, do: {i, true} + assert MST.last(y, 500, 300) == stuff end + end |