aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/data/merklesearchtree.ex
blob: 941d31d74797441676ac2becb7691de01f5b56ce (plain) (tree)
1
2
3
4
5
6
7
8
9








                                   

                                               




           

                                  

                                  
 












                                                                     















                                                        


                                        
                                          
                           
                                                                                


                                         
                                                      
                                         
                                                                               
        
                                                                          


                               
                                                         
                                                                                    
                          
                                         






                                                                                                

                         
                                         






                                                                                             


             
                                                         

     
                                    


                       
                                                                                                     
                                     
                               


                                                            

                                                              
                                 

                                                                  

                                                            




                             
                                              
               
                         



                                                    
                                       












                                                                              


       
                                                           
               
                         





















                                                                                              
       
 
                                                                      
               
                         



                                                                       
                                        











                                                                                                    


       
         




                                                                                       







                                                                                 
                                      






                                               





















                                                                                                       
                       

























                                                                                                 
       

     
                                             


                
                                                                       




















                                                                         


       
                                   

               
                        



                               
                            
                    
                                      
           


       












































                                                                         








                                     
                                                                          






















                                                           
   
defmodule SData.MerkleSearchTree do
  @moduledoc"""
  A Merkle search tree.

  A node of the tree is
    {
      level,
      hash_of_node | nil,
      [ 
        { item_low_bound, hash_of_node | nil },
        { item_low_bound, hash_of_node | nil },
        ...
      }
    }
  """

  alias SData.PageStore, as: Store

  @doc"""
  Create a new Merkle search tree.

  This structure can be used as a set with only true keys,
  or as a map if a merge function is given.

  `cmp` is a compare function for keys as defined in module `SData`.

  `merge` is a function for merging two items that have the same key.
  """
  defstruct root: nil,
            store: SData.LocalStore.new,
            cmp: &SData.cmp_term/2,
            merge: &SData.merge_true/2

          
  defmodule Page do
    defstruct [:level, :low, :list]
  end

  defimpl SData.Page, for: Page do
    def refs(page) do
      refs = for {_, _, h} <- page.list, h != nil, do: h
      if page.low != nil do
        [ page.low | refs ]
      else
        refs
      end
    end
  end


  @doc"""
    Insert an item into the search tree.
  """
  def insert(state, key, value \\ true) do
    level = calc_level(key)
    {hash, store} = insert_at(state, state.store, state.root, key, level, value)
    %{ state | root: hash, store: store }
  end

  defp insert_at(s, store, root, key, level, value) do
    {new_page, store} = if root == nil do
      { %Page{ level: level, low: nil, list: [ { key, value, nil } ] }, store }
    else
      %Page{ level: plevel, low: low, list: lst } = Store.get(store, root)
      [ { k0, _, _} | _ ] = lst
      cond do
        plevel < level ->
          {low, high, store} = split(s, store, root, key)
          { %Page{ level: level, low: low, list: [ { key, value, high } ] }, store }
        plevel == level ->
          store = Store.free(store, root)
          case s.cmp.(key, k0) do
            :before ->
              {low2a, low2b, store} = split(s, store, low, key)
              { %Page{ level: level, low: low2a, list: [ { key, value, low2b } | lst] }, store }
            _ ->
              {new_lst, store} = aux_insert_after_first(s, store, lst, key, value)
              { %Page{ level: plevel, low: low, list: new_lst }, store }
          end
        plevel > level ->
          store = Store.free(store, root)
          case s.cmp.(key, k0) do
            :before ->
              {new_low, store} = insert_at(s, store, low, key, level, value)
              { %Page{ level: plevel, low: new_low, list: lst }, store }
            :after ->
              {new_lst, store} = aux_insert_sub_after_first(s, store, lst, key, level, value)
              { %Page{ level: plevel, low: low, list: new_lst }, store }
          end
      end
    end
    Store.put(store, new_page)    # returns {hash, store}
  end

  defp split(s, store, hash, key) do
    if hash == nil do
      {nil, nil, store}
    else
      %Page{ level: level, low: low, list: lst } = Store.get(store, hash) || Store.get(s.store, hash)
      store = Store.free(store, hash)
      [ { k0, _, _} | _ ] = lst
      case s.cmp.(key, k0) do
        :before ->
          {lowlow, lowhi, store} = split(s, store, low, key)
          newp2 = %Page{ level: level, low: lowhi, list: lst }
          {newp2h, store} = Store.put(store, newp2)
          {lowlow, newp2h, store}
        :after ->
          {lst1, p2, store} = split_aux(s, store, lst, key, level)
          newp1 = %Page{ level: level, low: low, list: lst1}
          {newp1h, store} = Store.put(store, newp1)
          {newp1h, p2, store}
      end 
    end
  end

  defp split_aux(s, store, lst, key, level) do
    case lst do
      [ {k1, v1, r1} ] ->
        if s.cmp.(k1, key) == :duplicate do
          raise "Bad logic"
        end
        {r1l, r1h, store} = split(s, store, r1, key)
        { [{k1, v1, r1l}], r1h, store }
      [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] ->
        case s.cmp.(key, k2) do
          :before ->
            {r1l, r1h, store} = split(s, store, r1, key)
            p2 = %Page{ level: level, low: r1h, list: [ {k2, v2, r2} | rst ] }
            {p2h, store} = Store.put(store, p2)
            { [{k1, v1, r1l}], p2h, store }
          :after ->
            {rst2, hi, store} = split_aux(s, store, [rst1 | rst], key, level)
            { [ fst | rst2 ], hi, store }
          :duplicate ->
            raise "Bad logic"
        end
    end
  end

  defp aux_insert_after_first(s, store, lst, key, value) do
    case lst do
      [ {k1, v1, r1} ] ->
        case s.cmp.(k1, key) do
          :duplicate ->
            { [ {k1, s.merge.(v1, value), r1} ], store }
          :before ->
            {r1a, r1b, new_store} = split(s, store, r1, key)
            { [ {k1, v1, r1a}, {key, value, r1b} ], new_store }
        end
      [ {k1, v1, r1} = fst, {k2, v2, r2} = rst1 | rst ] ->
        case s.cmp.(k1, key) do
          :duplicate ->
            { [ {k1, s.merge.(v1, value), r1}, rst1 | rst ], store }
          :before ->
            case s.cmp.(k2, key) do
              :after ->
                {r1a, r1b, new_store} = split(s, store, r1, key)
                { [ {k1, v1, r1a}, {key, value, r1b}, {k2, v2, r2} | rst ], new_store }
              _ ->
                {rst2, new_store} = aux_insert_after_first(s, store, [rst1 | rst], key, value)
                { [ fst | rst2 ], new_store }
            end
        end
      end
    end

  defp aux_insert_sub_after_first(s, store, lst, key, level, value) do
    case lst do
      [ {k1, v1, r1} ] ->
        if s.cmp.(k1, key) == :duplicate do
          raise "Bad logic"
        end
        {r1new, store_new} = insert_at(s, store, r1, key, level, value)
        { [{k1, v1, r1new}], store_new }
      [ {k1, v1, r1} = fst, {k2, _, _} = rst1 | rst ] ->
        if s.cmp.(k1, key) == :duplicate do
          raise "Bad logic"
        end
        case s.cmp.(key, k2) do
          :before ->
            {r1new, store_new} = insert_at(s, store, r1, key, level, value)
            { [{k1, v1, r1new}, rst1 | rst], store_new }
          _ ->
            {rst2, new_store} = aux_insert_sub_after_first(s, store, [rst1 |rst], key, level, value)
            { [ fst | rst2 ], new_store }
        end
    end
  end

  @doc"""
  Merge values from another MST in this MST.

  The merge is not symmetrical in the sense that:
  - new pages are added in the store of the first argument
  - the callback is called for all items found in the second argument and not the first
  """
  def merge(to, from, callback \\ fn _, _ -> nil end) do
    { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback)
    %{ to | store: store, root: root }
  end

  defp merge_aux(s1, s2, store, r1, r2, callback) do
    case {r1, r2} do
      _ when r1 == r2 -> { store, r1 }
      {_, nil} ->
        { store, r1 }
      {nil, _} ->
        store = Store.copy(store, s2.store, r2)
        rec_callback(store, r2, callback)
        { store, r2 }
      _ ->
        %Page{ level: level1, low: low1, list: lst1 } = Store.get(store, r1)
        %Page{ level: level2, low: low2, list: lst2 } = Store.get(store, r2) || Store.get(s2.store, r2)
        { level, low1, lst1, low2, lst2 } = cond do
          level1 == level2 -> {level1, low1, lst1, low2, lst2}
          level1 > level2 ->  {level1, low1, lst1, r2, []}
          level2 > level1 ->  {level2, r1, [], low2, lst2}
        end
        { store, low, lst } = merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback)
        page = %Page{ level: level, low: low, list: lst }
        {hash, store} = Store.put(store, page)
        {store, hash}
    end
  end

  defp merge_aux_rec(s1, s2, store, low1, lst1, low2, lst2, callback) do
    case {lst1, lst2} do
      { [], [] } ->
        {store, hash} = merge_aux(s1, s2, store, low1, low2, callback)
        {store, hash, []}
      { [], [ {k, v, r} | rst2 ] } ->
        {low1l, low1h, store} = split(s1, store, low1, k)
        {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback)
        callback.(k, v)
        {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, [], r, rst2, callback)
        {store, newlow, [ {k, v, newr} | newrst ]}
      { [ {k, v, r} | rst1 ], [] } ->
        {low2l, low2h, store} = split(s2, store, low2, k)
        {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback)
        {store, newr, newrst} = merge_aux_rec(s1, s2, store, r, rst1, low2h, [], callback)
        {store, newlow, [ {k, v, newr} | newrst ]}
      { [ {k1, v1, r1} | rst1 ], [ {k2, v2, r2} | rst2 ] } ->
        case s1.cmp.(k1, k2) do
          :before ->
            {low2l, low2h, store} = split(s2, store, low2, k1)
            {store, newlow} = merge_aux(s1, s2, store, low1, low2l, callback)
            {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, low2h, lst2, callback)
            {store, newlow, [ {k1, v1, newr} | newrst ]}
          :after ->
            {low1l, low1h, store} = split(s1, store, low1, k2)
            {store, newlow} = merge_aux(s1, s2, store, low1l, low2, callback)
            callback.(k2, v2)
            {store, newr, newrst} = merge_aux_rec(s1, s2, store, low1h, lst1, r2, rst2, callback)
            {store, newlow, [ {k2, v2, newr} | newrst ]}
          :duplicate ->
            {store, newlow} = merge_aux(s1, s2, store, low1, low2, callback)
            newv = s1.merge.(v1, v2) ## TODO: callback here ??
            {store, newr, newrst} = merge_aux_rec(s1, s2, store, r1, rst1, r2, rst2, callback)
            {store, newlow, [ {k1, newv, newr} | newrst ]}
        end
    end
  end

  defp rec_callback(store, root, callback) do
    case root do
      nil -> nil
      _ ->
        %Page{ level: _, low: low, list: lst } = Store.get(store, root)
        rec_callback(store, low, callback)
        for {k, v, rst} <- lst do
          callback.(k, v)
          rec_callback(store, rst, callback)
        end
    end
  end

  @doc"""
    Get value for a specific key in search tree.
  """
  def get(state, key) do
    get(state, state.root, key)
  end

  defp get(s, root, key) do
    case root do
      nil -> nil
      _ ->
        %Page{ level: _, low: low, list: lst } = Store.get(s.store, root)
        get_aux(s, low, lst, key)
    end
  end

  defp get_aux(s, low, lst, key) do
    case lst do
      [] ->
        get(s, low, key)
      [ {k, v, low2} | rst ] ->
        case s.cmp.(key, k) do
          :duplicate -> v
          :before ->
            get(s, low, key)
          :after -> 
            get_aux(s, low2, rst, key)
        end
    end
  end

  @doc"""
    Get the last n items of the tree, or the last n items
    strictly before given upper bound if non nil
  """
  def last(state, top_bound, num) do
    last(state, state.root, top_bound, num)
  end

  defp last(s, root, top_bound, num) do
    case root do
      nil -> []
      _ ->
        %Page{ level: _, low: low, list: lst } = Store.get(s.store, root)
        last_aux(s, low, lst, top_bound, num)
    end
  end

  defp last_aux(s, low, lst, top_bound, num) do
    case lst do
      [] ->
        last(s, low, top_bound, num)
      [ {k, v, low2} | rst ] ->
        if top_bound == nil or s.cmp.(top_bound, k) == :after do
          items = last_aux(s, low2, rst, top_bound, num)
          items = if Enum.count(items) < num do
            [ {k, v} | items ]
          else
            items
          end
          cnt = Enum.count items
          if cnt < num do
            last(s, low, top_bound, num - cnt) ++ items
          else
            items
          end
        else
          last(s, low, top_bound, num)
        end
    end
  end


  @doc"""
    Dump Merkle search tree structure.
  """
  def dump(state) do
    dump(state.store, state.root, "")
  end

  defp dump(store, root, lvl) do
    case root do
      nil ->
        IO.puts(lvl <> "nil")
      _ ->
        %Page{ level: level, low: low, list: lst} = Store.get(store, root)
        IO.puts(lvl <> "#{root|>Base.encode16} (#{level})")
        dump(store, low, lvl <> "  ")
        for {k, v, r} <- lst do
          IO.puts(lvl<>"- #{inspect k} => #{inspect v}")
          dump(store, r, lvl <> "  ")
        end
    end
  end

  defp calc_level(key) do
    key
      |> SData.term_hash
      |> Base.encode16
      |> String.to_charlist
      |> count_leading_zeroes
  end

  defp count_leading_zeroes('0' ++ rest) do
    1 + count_leading_zeroes(rest)
  end
  defp count_leading_zeroes(_) do
    0
  end
end