diff options
author | Alex Auvolat <alex@adnab.me> | 2018-09-01 16:06:23 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-09-01 16:06:23 +0200 |
commit | c6ec33d6e612168e14d77007915a4ea423c55a2e (patch) | |
tree | 8b5645651a0cc991b8ac9c68c388d84c8dbe73d2 /shard/lib/data | |
parent | 1a0ef154a421af60f6d57dfe861dacb844a7d142 (diff) | |
download | shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.tar.gz shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.zip |
Move everything to subdirectory
Diffstat (limited to 'shard/lib/data')
-rw-r--r-- | shard/lib/data/data.ex | 49 | ||||
-rw-r--r-- | shard/lib/data/merklelist.ex | 143 | ||||
-rw-r--r-- | shard/lib/data/merklesearchtree.ex | 387 | ||||
-rw-r--r-- | shard/lib/data/store.ex | 91 |
4 files changed, 670 insertions, 0 deletions
diff --git a/shard/lib/data/data.ex b/shard/lib/data/data.ex new file mode 100644 index 0000000..c2c659d --- /dev/null +++ b/shard/lib/data/data.ex @@ -0,0 +1,49 @@ +defmodule SData do + @moduledoc """ + Utility functions + + Compare functions are functions that compares stored items and provides a total order. + They must return: + - `:after` if the first argument is more recent + - `:duplicate` if the two items are the same + - `:before` if the first argument is older + These functions must only return :duplicate for equal items. + """ + + @doc """ + Calculate the hash of an Erlang term by first converting it to its + binary representation. + """ + def term_hash(term, algo \\ :sha256) do + :crypto.hash(algo, (:erlang.term_to_binary term)) + end + + @doc""" + Compare function for arbitrary terms using the Erlang order + """ + def cmp_term(a, b) do + cond do + a > b -> :after + a < b -> :before + a == b -> :duplicate + end + end + + @doc""" + Compare function for timestamped strings + """ + def cmp_ts_str({ts1, str1}, {ts2, str2}) do + cond do + ts1 > ts2 -> :after + ts1 < ts2 -> :before + str1 > str2 -> :after + str1 < str2 -> :before + true -> :duplicate + end + end + + @doc""" + Merge function for nils + """ + def merge_true(true, true), do: true +end diff --git a/shard/lib/data/merklelist.ex b/shard/lib/data/merklelist.ex new file mode 100644 index 0000000..9b44ee8 --- /dev/null +++ b/shard/lib/data/merklelist.ex @@ -0,0 +1,143 @@ +defmodule SData.MerkleList do + @moduledoc""" + A simple Merkle list store. + + Future improvements: + - When messages are inserted other than at the top, all intermediate hashes + change. Keep track of the mapping from old hashes to new hashes so that get + requests can work even for hashes that are not valid anymore. + - group items in "pages" (bigger bundles) + """ + + defstruct [:root, :top, :cmp, :store] + + @doc""" + Create a Merkle list store. + + `cmp` is a compare function that respects the interface defined in module `SData`. + """ + def new(cmp) do + root_item = :root + root_hash = SData.term_hash root_item + state = %SData.MerkleList{ + root: root_hash, + top: root_hash, + cmp: cmp, + store: %{ root_hash => root_item } + } + state + end + + defp push(state, item) do + new_item = {item, state.top} + new_item_hash = SData.term_hash new_item + new_store = Map.put(state.store, new_item_hash, new_item) + %{ state | :top => new_item_hash, :store => new_store } + end + + defp pop(state) do + if state.top == state.root do + :error + else + {item, next} = Map.get(state.store, state.top) + new_store = Map.delete(state.store, state.top) + new_state = %{ state | :top => next, :store => new_store } + {:ok, item, new_state} + end + end + + @doc""" + Insert a list of items in the store. + + A callback function may be specified that is called on any item + that is sucessfully added, i.e. that wasn't present in the store before. + """ + def insert_many(state, items, callback \\ (fn _ -> nil end)) do + items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end) + insert_many_aux(state, items_sorted, callback) + end + + defp insert_many_aux(state, [], _callback) do + state + end + + defp insert_many_aux(state, [item | rest], callback) do + case pop(state) do + :error -> + new_state = push(insert_many_aux(state, rest, callback), item) + callback.(item) + new_state + {:ok, front, state_rest} -> + case state.cmp.(item, front) do + :after -> + new_state = push(insert_many_aux(state, rest, callback), item) + callback.(item) + new_state + :duplicate -> insert_many_aux(state, rest, callback) + :before -> push(insert_many_aux(state_rest, [item | rest], callback), front) + end + end + end + + @doc""" + Insert a single item in the store. + + A callback function may be specified that is called on the item + if it is sucessfully added, i.e. it wasn't present in the store before. + """ + def insert(state, item, callback \\ (fn _ -> nil end)) do + insert_many(state, [item], callback) + end + + @doc""" + Read some items from the state. + + The two parameters are optional: + - qbegin : hash of the first item to read + - qlimit : number of items to read + """ + def read(state, qbegin \\ nil, qlimit \\ nil) do + begin = qbegin || state.top + limit = qlimit || 20 + get_items_list(state, begin, limit) + end + + @doc""" + Get the hash of the last item + """ + def top(state) do + state.top + end + + @doc""" + Get the hash of the root item + """ + def root(state) do + state.root + end + + @doc""" + Check if the store holds a certain item + """ + def has(state, hash) do + Map.has_key?(state.store, hash) + end + + defp get_items_list(state, begin, limit) do + case limit do + 0 -> {:ok, [], begin} + _ -> + case Map.fetch(state.store, begin) do + {:ok, :root} -> + {:ok, [], nil } + {:ok, {item, next}} -> + case get_items_list(state, next, limit - 1) do + {:ok, rest, past} -> + {:ok, [ item | rest ], past } + {:error, reason} -> {:error, reason} + end + :error -> {:error, begin} + end + end + end +end diff --git a/shard/lib/data/merklesearchtree.ex b/shard/lib/data/merklesearchtree.ex new file mode 100644 index 0000000..941d31d --- /dev/null +++ b/shard/lib/data/merklesearchtree.ex @@ -0,0 +1,387 @@ +defmodule SData.MerkleSearchTree do + @moduledoc""" + A Merkle search tree. + + A node of the tree is + { + level, + hash_of_node | nil, + [ + { item_low_bound, hash_of_node | nil }, + { item_low_bound, hash_of_node | nil }, + ... + } + } + """ + + alias SData.PageStore, as: Store + + @doc""" + Create a new Merkle search tree. + + This structure can be used as a set with only true keys, + or as a map if a merge function is given. + + `cmp` is a compare function for keys as defined in module `SData`. + + `merge` is a function for merging two items that have the same key. + """ + defstruct root: nil, + store: SData.LocalStore.new, + cmp: &SData.cmp_term/2, + merge: &SData.merge_true/2 + + + defmodule Page do + defstruct [:level, :low, :list] + end + + defimpl SData.Page, for: Page do + def refs(page) do + refs = for {_, _, h} <- page.list, h != nil, do: h + if page.low != nil do + [ page.low | refs ] + else + refs + end + end + end + + + @doc""" + Insert an item into the search tree. + """ + def insert(state, key, value \\ true) do + level = calc_level(key) + {hash, store} = insert_at(state, state.store, state.root, key, level, value) + %{ state | root: hash, store: store } + end + + defp insert_at(s, store, root, key, level, value) do + {new_page, store} = if root == nil do + { %Page{ level: level, low: nil, list: [ { key, value, nil } ] }, store } + else + %Page{ level: plevel, low: low, list: lst } = Store.get(store, root) + [ { k0, _, _} | _ ] = lst + cond do + plevel < level -> + {low, high, store} = split(s, store, root, key) + { %Page{ level: level, low: low, list: [ { key, value, high } ] }, store } + plevel == level -> + store = Store.free(store, root) + case s.cmp.(key, k0) do + :before -> + {low2a, low2b, store} = split(s, store, low, key) + { %Page{ level: level, low: low2a, list: [ { key, value, low2b } | lst] }, store } + _ -> + {new_lst, store} = aux_insert_after_first(s, store, lst, key, value) + { %Page{ level: plevel, low: low, list: new_lst }, store } + end + plevel > level -> + store = Store.free(store, root) + case s.cmp.(key, k0) do + :before -> + {new_low, store} = insert_at(s, store, low, key, level, value) + { %Page{ level: plevel, low: new_low, list: lst }, store } + :after -> + {new_lst, store} = aux_insert_sub_after_first(s, store, lst, key, level, value) + { %Page{ level: plevel, low: low, list: new_lst }, store } + end + end + end + Store.put(store, new_page) # returns {hash, store} + end + + defp split(s, store, hash, key) do + if hash == nil do + {nil, nil, store} + else + %Page{ level: level, low: low, list: lst } = Store.get(store, hash) || Store.get(s.store, hash) + store = Store.free(store, hash) + [ { k0, _, _} | _ ] = lst + case s.cmp.(key, k0) do + :before -> + {lowlow, lowhi, store} = split(s, store, low, key) + newp2 = %Page{ level: level, low: lowhi, list: lst } + {newp2h, store} = Store.put(store, newp2) + {lowlow, newp2h, store} + :after -> + {lst1, p2, store} = split_aux(s, store, lst, key, level) + newp1 = %Page{ level: level, low: low, list: lst1} + {newp1h, store} = Store.put(store, newp1) + {newp1h, p2, store} + end + end + end + + defp split_aux(s, store, lst, key, level) do + case lst do + [ {k1, v1, r1} ] -> + if s.cmp.(k1, key) == :duplicate do + raise "Bad logic" + end + {r1l, r1h, store} = split(s, store, r1, key) + { [{k1, v1, r1l}], r1h, store } + [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] -> + case s.cmp.(key, k2) do + :before -> + {r1l, r1h, store} = split(s, store, r1, key) + p2 = %Page{ level: level, low: r1h, list: [ {k2, v2, r2} | rst ] } + {p2h, store} = Store.put(store, p2) + { [{k1, v1, r1l}], p2h, store } + :after -> + {rst2, hi, store} = split_aux(s, store, [rst1 | rst], key, level) + { [ fst | rst2 ], hi, store } + :duplicate -> + raise "Bad logic" + end + end + end + + defp aux_insert_after_first(s, store, lst, key, value) do + case lst do + [ {k1, v1, r1} ] -> + case s.cmp.(k1, key) do + :duplicate -> + { [ {k1, s.merge.(v1, value), r1} ], store } + :before -> + {r1a, r1b, new_store} = split(s, store, r1, key) + { [ {k1, v1, r1a}, {key, value, r1b} ], new_store } + end + [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] -> + case s.cmp.(k1, key) do + :duplicate -> + { [ {k1, s.merge.(v1, value), r1}, rst1 | rst ], store } + :before -> + case s.cmp.(k2, key) do + :after -> + {r1a, r1b, new_store} = split(s, store, r1, key) + { [ {k1, v1, r1a}, {key, value, r1b}, {k2, v2, r2} | rst ], new_store } + _ -> + {rst2, new_store} = aux_insert_after_first(s, store, [rst1 | rst], key, value) + { [ fst | rst2 ], new_store } + end + end + end + end + + defp aux_insert_sub_after_first(s, store, lst, key, level, value) do + case lst do + [ {k1, v1, r1} ] -> + if s.cmp.(k1, key) == :duplicate do + raise "Bad logic" + end + {r1new, store_new} = insert_at(s, store, r1, key, level, value) + { [{k1, v1, r1new}], store_new } + [ {k1, v1, r1} = fst, {k2, _, _} = rst1 | rst ] -> + if s.cmp.(k1, key) == :duplicate do + raise "Bad logic" + end + case s.cmp.(key, k2) do + :before -> + {r1new, store_new} = insert_at(s, store, r1, key, level, value) + { [{k1, v1, r1new}, rst1 | rst], store_new } + _ -> + {rst2, new_store} = aux_insert_sub_after_first(s, store, [rst1 |rst], key, level, value) + { [ fst | rst2 ], new_store } + end + end + end + + @doc""" + Merge values from another MST in this MST. + + The merge is not symmetrical in the sense that: + - new pages are added in the store of the first argument + - the callback is called for all items found in the second argument and not the first + """ + def merge(to, from, callback \\ fn _, _ -> nil end) do + { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback) + %{ to | store: store, root: root } + end + + defp merge_aux(s1, s2, store, r1, r2, callback) do + case {r1, r2} do + _ when r1 == r2 -> { store, r1 } + {_, nil} -> + { store, r1 } + {nil, _} -> + store = Store.copy(store, s2.store, r2) + rec_callback(store, r2, callback) + { store, r2 } + _ -> + %Page{ level: level1, low: low1, list: lst1 } = Store.get(store, r1) + %Page{ level: level2, low: low2, list: lst2 } = Store.get(store, r2) || Store.get(s2.store, r2) + { level, low1, lst1, low2, lst2 } = cond do + level1 == level2 -> {level1, low1, lst1, low2, lst2} + level1 > level2 -> {level1, low1, lst1, r2, []} + level2 > level1 -> {level2, r1, [], low2, lst2} + end + { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) + page = %Page{ level: level, low: low, list: lst } + {hash, store} = Store.put(store, page) + {store, hash} + end + end + + defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) do + case {lst1, lst2} do + { [], [] } -> + {store, hash} = merge_aux(s1, s2, store, low1, low2, callback) + {store, hash, []} + { [], [ {k, v, r} | rst2 ] } -> + {low1l, low1h, store} = split(s1, store, low1, k) + {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback) + callback.(k, v) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2, callback) + {store, newlow, [ {k, v, newr} | newrst ]} + { [ {k, v, r} | rst1 ], [] } -> + {low2l, low2h, store} = split(s2, store, low2, k) + {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [], callback) + {store, newlow, [ {k, v, newr} | newrst ]} + { [ {k1, v1, r1} | rst1 ], [ {k2, v2, r2} | rst2 ] } -> + case s1.cmp.(k1, k2) do + :before -> + {low2l, low2h, store} = split(s2, store, low2, k1) + {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2, callback) + {store, newlow, [ {k1, v1, newr} | newrst ]} + :after -> + {low1l, low1h, store} = split(s1, store, low1, k2) + {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback) + callback.(k2, v2) + {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2, callback) + {store, newlow, [ {k2, v2, newr} | newrst ]} + :duplicate -> + {store, newlow} = merge_aux(s1, s2, store, low1, low2, callback) + newv = s1.merge.(v1, v2) ## TODO: callback here ?? + {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2, callback) + {store, newlow, [ {k1, newv, newr} | newrst ]} + end + end + end + + defp rec_callback(store, root, callback) do + case root do + nil -> nil + _ -> + %Page{ level: _, low: low, list: lst } = Store.get(store, root) + rec_callback(store, low, callback) + for {k, v, rst} <- lst do + callback.(k, v) + rec_callback(store, rst, callback) + end + end + end + + @doc""" + Get value for a specific key in search tree. + """ + def get(state, key) do + get(state, state.root, key) + end + + defp get(s, root, key) do + case root do + nil -> nil + _ -> + %Page{ level: _, low: low, list: lst } = Store.get(s.store, root) + get_aux(s, low, lst, key) + end + end + + defp get_aux(s, low, lst, key) do + case lst do + [] -> + get(s, low, key) + [ {k, v, low2} | rst ] -> + case s.cmp.(key, k) do + :duplicate -> v + :before -> + get(s, low, key) + :after -> + get_aux(s, low2, rst, key) + end + end + end + + @doc""" + Get the last n items of the tree, or the last n items + strictly before given upper bound if non nil + """ + def last(state, top_bound, num) do + last(state, state.root, top_bound, num) + end + + defp last(s, root, top_bound, num) do + case root do + nil -> [] + _ -> + %Page{ level: _, low: low, list: lst } = Store.get(s.store, root) + last_aux(s, low, lst, top_bound, num) + end + end + + defp last_aux(s, low, lst, top_bound, num) do + case lst do + [] -> + last(s, low, top_bound, num) + [ {k, v, low2} | rst ] -> + if top_bound == nil or s.cmp.(top_bound, k) == :after do + items = last_aux(s, low2, rst, top_bound, num) + items = if Enum.count(items) < num do + [ {k, v} | items ] + else + items + end + cnt = Enum.count items + if cnt < num do + last(s, low, top_bound, num - cnt) ++ items + else + items + end + else + last(s, low, top_bound, num) + end + end + end + + + @doc""" + Dump Merkle search tree structure. + """ + def dump(state) do + dump(state.store, state.root, "") + end + + defp dump(store, root, lvl) do + case root do + nil -> + IO.puts(lvl <> "nil") + _ -> + %Page{ level: level, low: low, list: lst} = Store.get(store, root) + IO.puts(lvl <> "#{root|>Base.encode16} (#{level})") + dump(store, low, lvl <> " ") + for {k, v, r} <- lst do + IO.puts(lvl<>"- #{inspect k} => #{inspect v}") + dump(store, r, lvl <> " ") + end + end + end + + defp calc_level(key) do + key + |> SData.term_hash + |> Base.encode16 + |> String.to_charlist + |> count_leading_zeroes + end + + defp count_leading_zeroes('0' ++ rest) do + 1 + count_leading_zeroes(rest) + end + defp count_leading_zeroes(_) do + 0 + end +end diff --git a/shard/lib/data/store.ex b/shard/lib/data/store.ex new file mode 100644 index 0000000..ca12cd0 --- /dev/null +++ b/shard/lib/data/store.ex @@ -0,0 +1,91 @@ +defprotocol SData.Page do + @moduledoc""" + Protocol to be implemented by objects that are used as data pages + in a pagestore and that may reference other data pages by their hash. + """ + + @fallback_to_any true + + @doc""" + Get hashes of all pages referenced by this page. + """ + def refs(page) +end + +defimpl SData.Page, for: Any do + def refs(_page), do: [] +end + + +defprotocol SData.PageStore do + @moduledoc""" + Protocol to be implemented for page stores to allow their + manipulation. + + This protocol may also be implemented by store proxies that track + operations and implement different synchronization or caching mechanisms. + """ + + @doc""" + Put a page. Argument is the content of the page, returns the + hash that the store has associated to it. + + Returns {hash, store} + """ + def put(store, page) + + @doc""" + Get a page referenced by its hash. + + Returns page + """ + def get(store, hash) + + @doc""" + Copy to the store a page and all its references from the other store. + In the case of pages on the network in a distributed store, this may + be lazy. + + Returns store + """ + def copy(store, other_store, hash) + + @doc""" + Free a page referenced by its hash, marking it as no longer needed. + + Returns store + """ + def free(store, hash) +end + + +defmodule SData.LocalStore do + defstruct [:pages] + + def new() do + %SData.LocalStore{ pages: %{} } + end +end + +defimpl SData.PageStore, for: SData.LocalStore do + def put(store, page) do + hash = SData.term_hash page + store = %{ store | pages: Map.put(store.pages, hash, page) } + { hash, store } + end + + def get(store, hash) do + store.pages[hash] + end + + def copy(store, other_store, hash) do + page = SData.PageStore.get(other_store, hash) + refs = SData.Page.refs(page) + store = Enum.reduce(refs, store, fn x, acc -> copy(acc, other_store, x) end) + %{ store | pages: Map.put(store.pages, hash, page) } + end + + def free(store, hash) do + %{ store | pages: Map.delete(store.pages, hash) } + end +end |