aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/data
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-09-01 16:06:23 +0200
committerAlex Auvolat <alex@adnab.me>2018-09-01 16:06:23 +0200
commitc6ec33d6e612168e14d77007915a4ea423c55a2e (patch)
tree8b5645651a0cc991b8ac9c68c388d84c8dbe73d2 /shard/lib/data
parent1a0ef154a421af60f6d57dfe861dacb844a7d142 (diff)
downloadshard-c6ec33d6e612168e14d77007915a4ea423c55a2e.tar.gz
shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.zip
Move everything to subdirectory
Diffstat (limited to 'shard/lib/data')
-rw-r--r--shard/lib/data/data.ex49
-rw-r--r--shard/lib/data/merklelist.ex143
-rw-r--r--shard/lib/data/merklesearchtree.ex387
-rw-r--r--shard/lib/data/store.ex91
4 files changed, 670 insertions, 0 deletions
diff --git a/shard/lib/data/data.ex b/shard/lib/data/data.ex
new file mode 100644
index 0000000..c2c659d
--- /dev/null
+++ b/shard/lib/data/data.ex
@@ -0,0 +1,49 @@
+defmodule SData do
+ @moduledoc """
+ Utility functions
+
+ Compare functions are functions that compares stored items and provides a total order.
+ They must return:
+ - `:after` if the first argument is more recent
+ - `:duplicate` if the two items are the same
+ - `:before` if the first argument is older
+ These functions must only return :duplicate for equal items.
+ """
+
+ @doc """
+ Calculate the hash of an Erlang term by first converting it to its
+ binary representation.
+ """
+ def term_hash(term, algo \\ :sha256) do
+ :crypto.hash(algo, (:erlang.term_to_binary term))
+ end
+
+ @doc"""
+ Compare function for arbitrary terms using the Erlang order
+ """
+ def cmp_term(a, b) do
+ cond do
+ a > b -> :after
+ a < b -> :before
+ a == b -> :duplicate
+ end
+ end
+
+ @doc"""
+ Compare function for timestamped strings
+ """
+ def cmp_ts_str({ts1, str1}, {ts2, str2}) do
+ cond do
+ ts1 > ts2 -> :after
+ ts1 < ts2 -> :before
+ str1 > str2 -> :after
+ str1 < str2 -> :before
+ true -> :duplicate
+ end
+ end
+
+ @doc"""
+ Merge function for nils
+ """
+ def merge_true(true, true), do: true
+end
diff --git a/shard/lib/data/merklelist.ex b/shard/lib/data/merklelist.ex
new file mode 100644
index 0000000..9b44ee8
--- /dev/null
+++ b/shard/lib/data/merklelist.ex
@@ -0,0 +1,143 @@
+defmodule SData.MerkleList do
+ @moduledoc"""
+ A simple Merkle list store.
+
+ Future improvements:
+ - When messages are inserted other than at the top, all intermediate hashes
+ change. Keep track of the mapping from old hashes to new hashes so that get
+ requests can work even for hashes that are not valid anymore.
+ - group items in "pages" (bigger bundles)
+ """
+
+ defstruct [:root, :top, :cmp, :store]
+
+ @doc"""
+ Create a Merkle list store.
+
+ `cmp` is a compare function that respects the interface defined in module `SData`.
+ """
+ def new(cmp) do
+ root_item = :root
+ root_hash = SData.term_hash root_item
+ state = %SData.MerkleList{
+ root: root_hash,
+ top: root_hash,
+ cmp: cmp,
+ store: %{ root_hash => root_item }
+ }
+ state
+ end
+
+ defp push(state, item) do
+ new_item = {item, state.top}
+ new_item_hash = SData.term_hash new_item
+ new_store = Map.put(state.store, new_item_hash, new_item)
+ %{ state | :top => new_item_hash, :store => new_store }
+ end
+
+ defp pop(state) do
+ if state.top == state.root do
+ :error
+ else
+ {item, next} = Map.get(state.store, state.top)
+ new_store = Map.delete(state.store, state.top)
+ new_state = %{ state | :top => next, :store => new_store }
+ {:ok, item, new_state}
+ end
+ end
+
+ @doc"""
+ Insert a list of items in the store.
+
+ A callback function may be specified that is called on any item
+ that is sucessfully added, i.e. that wasn't present in the store before.
+ """
+ def insert_many(state, items, callback \\ (fn _ -> nil end)) do
+ items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end)
+ insert_many_aux(state, items_sorted, callback)
+ end
+
+ defp insert_many_aux(state, [], _callback) do
+ state
+ end
+
+ defp insert_many_aux(state, [item | rest], callback) do
+ case pop(state) do
+ :error ->
+ new_state = push(insert_many_aux(state, rest, callback), item)
+ callback.(item)
+ new_state
+ {:ok, front, state_rest} ->
+ case state.cmp.(item, front) do
+ :after ->
+ new_state = push(insert_many_aux(state, rest, callback), item)
+ callback.(item)
+ new_state
+ :duplicate -> insert_many_aux(state, rest, callback)
+ :before -> push(insert_many_aux(state_rest, [item | rest], callback), front)
+ end
+ end
+ end
+
+ @doc"""
+ Insert a single item in the store.
+
+ A callback function may be specified that is called on the item
+ if it is sucessfully added, i.e. it wasn't present in the store before.
+ """
+ def insert(state, item, callback \\ (fn _ -> nil end)) do
+ insert_many(state, [item], callback)
+ end
+
+ @doc"""
+ Read some items from the state.
+
+ The two parameters are optional:
+ - qbegin : hash of the first item to read
+ - qlimit : number of items to read
+ """
+ def read(state, qbegin \\ nil, qlimit \\ nil) do
+ begin = qbegin || state.top
+ limit = qlimit || 20
+ get_items_list(state, begin, limit)
+ end
+
+ @doc"""
+ Get the hash of the last item
+ """
+ def top(state) do
+ state.top
+ end
+
+ @doc"""
+ Get the hash of the root item
+ """
+ def root(state) do
+ state.root
+ end
+
+ @doc"""
+ Check if the store holds a certain item
+ """
+ def has(state, hash) do
+ Map.has_key?(state.store, hash)
+ end
+
+ defp get_items_list(state, begin, limit) do
+ case limit do
+ 0 -> {:ok, [], begin}
+ _ ->
+ case Map.fetch(state.store, begin) do
+ {:ok, :root} ->
+ {:ok, [], nil }
+ {:ok, {item, next}} ->
+ case get_items_list(state, next, limit - 1) do
+ {:ok, rest, past} ->
+ {:ok, [ item | rest ], past }
+ {:error, reason} -> {:error, reason}
+ end
+ :error -> {:error, begin}
+ end
+ end
+ end
+end
diff --git a/shard/lib/data/merklesearchtree.ex b/shard/lib/data/merklesearchtree.ex
new file mode 100644
index 0000000..941d31d
--- /dev/null
+++ b/shard/lib/data/merklesearchtree.ex
@@ -0,0 +1,387 @@
+defmodule SData.MerkleSearchTree do
+ @moduledoc"""
+ A Merkle search tree.
+
+ A node of the tree is
+ {
+ level,
+ hash_of_node | nil,
+ [
+ { item_low_bound, hash_of_node | nil },
+ { item_low_bound, hash_of_node | nil },
+ ...
+ }
+ }
+ """
+
+ alias SData.PageStore, as: Store
+
+ @doc"""
+ Create a new Merkle search tree.
+
+ This structure can be used as a set with only true keys,
+ or as a map if a merge function is given.
+
+ `cmp` is a compare function for keys as defined in module `SData`.
+
+ `merge` is a function for merging two items that have the same key.
+ """
+ defstruct root: nil,
+ store: SData.LocalStore.new,
+ cmp: &SData.cmp_term/2,
+ merge: &SData.merge_true/2
+
+
+ defmodule Page do
+ defstruct [:level, :low, :list]
+ end
+
+ defimpl SData.Page, for: Page do
+ def refs(page) do
+ refs = for {_, _, h} <- page.list, h != nil, do: h
+ if page.low != nil do
+ [ page.low | refs ]
+ else
+ refs
+ end
+ end
+ end
+
+
+ @doc"""
+ Insert an item into the search tree.
+ """
+ def insert(state, key, value \\ true) do
+ level = calc_level(key)
+ {hash, store} = insert_at(state, state.store, state.root, key, level, value)
+ %{ state | root: hash, store: store }
+ end
+
+ defp insert_at(s, store, root, key, level, value) do
+ {new_page, store} = if root == nil do
+ { %Page{ level: level, low: nil, list: [ { key, value, nil } ] }, store }
+ else
+ %Page{ level: plevel, low: low, list: lst } = Store.get(store, root)
+ [ { k0, _, _} | _ ] = lst
+ cond do
+ plevel < level ->
+ {low, high, store} = split(s, store, root, key)
+ { %Page{ level: level, low: low, list: [ { key, value, high } ] }, store }
+ plevel == level ->
+ store = Store.free(store, root)
+ case s.cmp.(key, k0) do
+ :before ->
+ {low2a, low2b, store} = split(s, store, low, key)
+ { %Page{ level: level, low: low2a, list: [ { key, value, low2b } | lst] }, store }
+ _ ->
+ {new_lst, store} = aux_insert_after_first(s, store, lst, key, value)
+ { %Page{ level: plevel, low: low, list: new_lst }, store }
+ end
+ plevel > level ->
+ store = Store.free(store, root)
+ case s.cmp.(key, k0) do
+ :before ->
+ {new_low, store} = insert_at(s, store, low, key, level, value)
+ { %Page{ level: plevel, low: new_low, list: lst }, store }
+ :after ->
+ {new_lst, store} = aux_insert_sub_after_first(s, store, lst, key, level, value)
+ { %Page{ level: plevel, low: low, list: new_lst }, store }
+ end
+ end
+ end
+ Store.put(store, new_page) # returns {hash, store}
+ end
+
+ defp split(s, store, hash, key) do
+ if hash == nil do
+ {nil, nil, store}
+ else
+ %Page{ level: level, low: low, list: lst } = Store.get(store, hash) || Store.get(s.store, hash)
+ store = Store.free(store, hash)
+ [ { k0, _, _} | _ ] = lst
+ case s.cmp.(key, k0) do
+ :before ->
+ {lowlow, lowhi, store} = split(s, store, low, key)
+ newp2 = %Page{ level: level, low: lowhi, list: lst }
+ {newp2h, store} = Store.put(store, newp2)
+ {lowlow, newp2h, store}
+ :after ->
+ {lst1, p2, store} = split_aux(s, store, lst, key, level)
+ newp1 = %Page{ level: level, low: low, list: lst1}
+ {newp1h, store} = Store.put(store, newp1)
+ {newp1h, p2, store}
+ end
+ end
+ end
+
+ defp split_aux(s, store, lst, key, level) do
+ case lst do
+ [ {k1, v1, r1} ] ->
+ if s.cmp.(k1, key) == :duplicate do
+ raise "Bad logic"
+ end
+ {r1l, r1h, store} = split(s, store, r1, key)
+ { [{k1, v1, r1l}], r1h, store }
+ [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] ->
+ case s.cmp.(key, k2) do
+ :before ->
+ {r1l, r1h, store} = split(s, store, r1, key)
+ p2 = %Page{ level: level, low: r1h, list: [ {k2, v2, r2} | rst ] }
+ {p2h, store} = Store.put(store, p2)
+ { [{k1, v1, r1l}], p2h, store }
+ :after ->
+ {rst2, hi, store} = split_aux(s, store, [rst1 | rst], key, level)
+ { [ fst | rst2 ], hi, store }
+ :duplicate ->
+ raise "Bad logic"
+ end
+ end
+ end
+
+ defp aux_insert_after_first(s, store, lst, key, value) do
+ case lst do
+ [ {k1, v1, r1} ] ->
+ case s.cmp.(k1, key) do
+ :duplicate ->
+ { [ {k1, s.merge.(v1, value), r1} ], store }
+ :before ->
+ {r1a, r1b, new_store} = split(s, store, r1, key)
+ { [ {k1, v1, r1a}, {key, value, r1b} ], new_store }
+ end
+ [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] ->
+ case s.cmp.(k1, key) do
+ :duplicate ->
+ { [ {k1, s.merge.(v1, value), r1}, rst1 | rst ], store }
+ :before ->
+ case s.cmp.(k2, key) do
+ :after ->
+ {r1a, r1b, new_store} = split(s, store, r1, key)
+ { [ {k1, v1, r1a}, {key, value, r1b}, {k2, v2, r2} | rst ], new_store }
+ _ ->
+ {rst2, new_store} = aux_insert_after_first(s, store, [rst1 | rst], key, value)
+ { [ fst | rst2 ], new_store }
+ end
+ end
+ end
+ end
+
+ defp aux_insert_sub_after_first(s, store, lst, key, level, value) do
+ case lst do
+ [ {k1, v1, r1} ] ->
+ if s.cmp.(k1, key) == :duplicate do
+ raise "Bad logic"
+ end
+ {r1new, store_new} = insert_at(s, store, r1, key, level, value)
+ { [{k1, v1, r1new}], store_new }
+ [ {k1, v1, r1} = fst, {k2, _, _} = rst1 | rst ] ->
+ if s.cmp.(k1, key) == :duplicate do
+ raise "Bad logic"
+ end
+ case s.cmp.(key, k2) do
+ :before ->
+ {r1new, store_new} = insert_at(s, store, r1, key, level, value)
+ { [{k1, v1, r1new}, rst1 | rst], store_new }
+ _ ->
+ {rst2, new_store} = aux_insert_sub_after_first(s, store, [rst1 |rst], key, level, value)
+ { [ fst | rst2 ], new_store }
+ end
+ end
+ end
+
+ @doc"""
+ Merge values from another MST in this MST.
+
+ The merge is not symmetrical in the sense that:
+ - new pages are added in the store of the first argument
+ - the callback is called for all items found in the second argument and not the first
+ """
+ def merge(to, from, callback \\ fn _, _ -> nil end) do
+ { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback)
+ %{ to | store: store, root: root }
+ end
+
+ defp merge_aux(s1, s2, store, r1, r2, callback) do
+ case {r1, r2} do
+ _ when r1 == r2 -> { store, r1 }
+ {_, nil} ->
+ { store, r1 }
+ {nil, _} ->
+ store = Store.copy(store, s2.store, r2)
+ rec_callback(store, r2, callback)
+ { store, r2 }
+ _ ->
+ %Page{ level: level1, low: low1, list: lst1 } = Store.get(store, r1)
+ %Page{ level: level2, low: low2, list: lst2 } = Store.get(store, r2) || Store.get(s2.store, r2)
+ { level, low1, lst1, low2, lst2 } = cond do
+ level1 == level2 -> {level1, low1, lst1, low2, lst2}
+ level1 > level2 -> {level1, low1, lst1, r2, []}
+ level2 > level1 -> {level2, r1, [], low2, lst2}
+ end
+ { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback)
+ page = %Page{ level: level, low: low, list: lst }
+ {hash, store} = Store.put(store, page)
+ {store, hash}
+ end
+ end
+
+ defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) do
+ case {lst1, lst2} do
+ { [], [] } ->
+ {store, hash} = merge_aux(s1, s2, store, low1, low2, callback)
+ {store, hash, []}
+ { [], [ {k, v, r} | rst2 ] } ->
+ {low1l, low1h, store} = split(s1, store, low1, k)
+ {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback)
+ callback.(k, v)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2, callback)
+ {store, newlow, [ {k, v, newr} | newrst ]}
+ { [ {k, v, r} | rst1 ], [] } ->
+ {low2l, low2h, store} = split(s2, store, low2, k)
+ {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [], callback)
+ {store, newlow, [ {k, v, newr} | newrst ]}
+ { [ {k1, v1, r1} | rst1 ], [ {k2, v2, r2} | rst2 ] } ->
+ case s1.cmp.(k1, k2) do
+ :before ->
+ {low2l, low2h, store} = split(s2, store, low2, k1)
+ {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2, callback)
+ {store, newlow, [ {k1, v1, newr} | newrst ]}
+ :after ->
+ {low1l, low1h, store} = split(s1, store, low1, k2)
+ {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback)
+ callback.(k2, v2)
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2, callback)
+ {store, newlow, [ {k2, v2, newr} | newrst ]}
+ :duplicate ->
+ {store, newlow} = merge_aux(s1, s2, store, low1, low2, callback)
+ newv = s1.merge.(v1, v2) ## TODO: callback here ??
+ {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2, callback)
+ {store, newlow, [ {k1, newv, newr} | newrst ]}
+ end
+ end
+ end
+
+ defp rec_callback(store, root, callback) do
+ case root do
+ nil -> nil
+ _ ->
+ %Page{ level: _, low: low, list: lst } = Store.get(store, root)
+ rec_callback(store, low, callback)
+ for {k, v, rst} <- lst do
+ callback.(k, v)
+ rec_callback(store, rst, callback)
+ end
+ end
+ end
+
+ @doc"""
+ Get value for a specific key in search tree.
+ """
+ def get(state, key) do
+ get(state, state.root, key)
+ end
+
+ defp get(s, root, key) do
+ case root do
+ nil -> nil
+ _ ->
+ %Page{ level: _, low: low, list: lst } = Store.get(s.store, root)
+ get_aux(s, low, lst, key)
+ end
+ end
+
+ defp get_aux(s, low, lst, key) do
+ case lst do
+ [] ->
+ get(s, low, key)
+ [ {k, v, low2} | rst ] ->
+ case s.cmp.(key, k) do
+ :duplicate -> v
+ :before ->
+ get(s, low, key)
+ :after ->
+ get_aux(s, low2, rst, key)
+ end
+ end
+ end
+
+ @doc"""
+ Get the last n items of the tree, or the last n items
+ strictly before given upper bound if non nil
+ """
+ def last(state, top_bound, num) do
+ last(state, state.root, top_bound, num)
+ end
+
+ defp last(s, root, top_bound, num) do
+ case root do
+ nil -> []
+ _ ->
+ %Page{ level: _, low: low, list: lst } = Store.get(s.store, root)
+ last_aux(s, low, lst, top_bound, num)
+ end
+ end
+
+ defp last_aux(s, low, lst, top_bound, num) do
+ case lst do
+ [] ->
+ last(s, low, top_bound, num)
+ [ {k, v, low2} | rst ] ->
+ if top_bound == nil or s.cmp.(top_bound, k) == :after do
+ items = last_aux(s, low2, rst, top_bound, num)
+ items = if Enum.count(items) < num do
+ [ {k, v} | items ]
+ else
+ items
+ end
+ cnt = Enum.count items
+ if cnt < num do
+ last(s, low, top_bound, num - cnt) ++ items
+ else
+ items
+ end
+ else
+ last(s, low, top_bound, num)
+ end
+ end
+ end
+
+
+ @doc"""
+ Dump Merkle search tree structure.
+ """
+ def dump(state) do
+ dump(state.store, state.root, "")
+ end
+
+ defp dump(store, root, lvl) do
+ case root do
+ nil ->
+ IO.puts(lvl <> "nil")
+ _ ->
+ %Page{ level: level, low: low, list: lst} = Store.get(store, root)
+ IO.puts(lvl <> "#{root|>Base.encode16} (#{level})")
+ dump(store, low, lvl <> " ")
+ for {k, v, r} <- lst do
+ IO.puts(lvl<>"- #{inspect k} => #{inspect v}")
+ dump(store, r, lvl <> " ")
+ end
+ end
+ end
+
+ defp calc_level(key) do
+ key
+ |> SData.term_hash
+ |> Base.encode16
+ |> String.to_charlist
+ |> count_leading_zeroes
+ end
+
+ defp count_leading_zeroes('0' ++ rest) do
+ 1 + count_leading_zeroes(rest)
+ end
+ defp count_leading_zeroes(_) do
+ 0
+ end
+end
diff --git a/shard/lib/data/store.ex b/shard/lib/data/store.ex
new file mode 100644
index 0000000..ca12cd0
--- /dev/null
+++ b/shard/lib/data/store.ex
@@ -0,0 +1,91 @@
+defprotocol SData.Page do
+ @moduledoc"""
+ Protocol to be implemented by objects that are used as data pages
+ in a pagestore and that may reference other data pages by their hash.
+ """
+
+ @fallback_to_any true
+
+ @doc"""
+ Get hashes of all pages referenced by this page.
+ """
+ def refs(page)
+end
+
+defimpl SData.Page, for: Any do
+ def refs(_page), do: []
+end
+
+
+defprotocol SData.PageStore do
+ @moduledoc"""
+ Protocol to be implemented for page stores to allow their
+ manipulation.
+
+ This protocol may also be implemented by store proxies that track
+ operations and implement different synchronization or caching mechanisms.
+ """
+
+ @doc"""
+ Put a page. Argument is the content of the page, returns the
+ hash that the store has associated to it.
+
+ Returns {hash, store}
+ """
+ def put(store, page)
+
+ @doc"""
+ Get a page referenced by its hash.
+
+ Returns page
+ """
+ def get(store, hash)
+
+ @doc"""
+ Copy to the store a page and all its references from the other store.
+ In the case of pages on the network in a distributed store, this may
+ be lazy.
+
+ Returns store
+ """
+ def copy(store, other_store, hash)
+
+ @doc"""
+ Free a page referenced by its hash, marking it as no longer needed.
+
+ Returns store
+ """
+ def free(store, hash)
+end
+
+
+defmodule SData.LocalStore do
+ defstruct [:pages]
+
+ def new() do
+ %SData.LocalStore{ pages: %{} }
+ end
+end
+
+defimpl SData.PageStore, for: SData.LocalStore do
+ def put(store, page) do
+ hash = SData.term_hash page
+ store = %{ store | pages: Map.put(store.pages, hash, page) }
+ { hash, store }
+ end
+
+ def get(store, hash) do
+ store.pages[hash]
+ end
+
+ def copy(store, other_store, hash) do
+ page = SData.PageStore.get(other_store, hash)
+ refs = SData.Page.refs(page)
+ store = Enum.reduce(refs, store, fn x, acc -> copy(acc, other_store, x) end)
+ %{ store | pages: Map.put(store.pages, hash, page) }
+ end
+
+ def free(store, hash) do
+ %{ store | pages: Map.delete(store.pages, hash) }
+ end
+end