aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/app/blockstore.ex145
-rw-r--r--lib/app/blockstore.ex_78
-rw-r--r--lib/app/chat.ex86
-rw-r--r--lib/cli/cli.ex11
-rw-r--r--lib/data/merklesearchtree.ex109
-rw-r--r--test/mst_test.exs20
6 files changed, 318 insertions, 131 deletions
diff --git a/lib/app/blockstore.ex b/lib/app/blockstore.ex
new file mode 100644
index 0000000..1523a44
--- /dev/null
+++ b/lib/app/blockstore.ex
@@ -0,0 +1,145 @@
+defmodule SApp.BlockStore do
+ @moduledoc """
+ A module that implements a content-adressable storage (blocks, or pages,
+ identified by the hash of their contents).
+
+ This is not a shard, it is a side process that a shard may use to store its data.
+
+ TODO: WIP
+ """
+
+ use GenServer
+
+ @enforce_keys [:pid]
+ defstruct [:pid, :prefer_ask]
+
+
+ defmodule State do
+ defstruct [:shard_id, :path, :store, :reqs, :retries]
+ end
+
+
+ def start_link(shard_id, path) do
+ GenServer.start_link(__MODULE__, [shard_id, path])
+ end
+
+ def init([shard_id, path]) do
+ Shard.Manager.dispatch_to(shard_id, path, self())
+
+ {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}}
+ end
+
+ def handle_call({:get, key, prefer_ask}, from, state) do
+ case state.store[key] do
+ nil ->
+ case prefer_ask do
+ [_ | _] ->
+ for peer <- prefer_ask do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ _ ->
+ ask_random_peers(state, key)
+ end
+ reqs_key = case state.reqs[key] do
+ nil ->
+ MapSet.put(MapSet.new(), from)
+ ms ->
+ MapSet.put(ms, from)
+ end
+ state = put_in(state.reqs[key], reqs_key)
+ {:noreply, state}
+ v ->
+ {:reply, v, state}
+ end
+ end
+
+ def handle_call({:put, val}, _from, state) do
+ hash = SData.term_hash val
+ state = %{state | store: Map.put(state.store, hash, val)}
+ {:reply, hash, state}
+ end
+
+ def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
+ state = case msg do
+ {:get, key} ->
+ case state.store[key] do
+ nil ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
+ v ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
+ end
+ state
+ {:info, hash, value} ->
+ if SData.term_hash value == hash do
+ reqs = case state.reqs[hash] do
+ nil -> state.reqs
+ pids ->
+ for pid <- pids do
+ GenServer.reply(pid, value)
+ end
+ Map.delete(state.reqs, hash)
+ end
+ state = %{state | retries: Map.delete(state.retries, hash)}
+ %{state | store: Map.put(state.store, hash, value), reqs: reqs}
+ else
+ state
+ end
+ {:not_found, key} ->
+ if state.reqs[key] != nil and state.store[key] == nil do
+ nretry = case state.retries[key] do
+ nil -> 1
+ n -> n+1
+ end
+ if nretry < 3 do
+ ask_random_peers(state, key)
+ %{state | retries: Map.put(state.retries, key, nretry)}
+ else
+ for pid <- state.reqs[key] do
+ GenServer.reply(pid, nil)
+ end
+ state = %{state | reqs: Map.delete(state.reqs, key)}
+ state = %{state | retries: Map.delete(state.retries, key)}
+ state
+ end
+ else
+ state
+ end
+ end
+ {:noreply, state}
+ end
+
+ def ask_random_peers(state, key) do
+ peers = :ets.lookup(:shard_peer_db, state.shard_id)
+ |> Enum.shuffle
+ |> Enum.take(3)
+ for peer <- peers do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ end
+
+
+ defimpl SData.PageStore do
+ def put(store, page) do
+ hash = GenServer.call(store.pid, {:put, page})
+ { hash, store }
+ end
+
+ def get(store, hash) do
+ GenServer.call(store.pid, {:get, hash, store.prefer_ask})
+ end
+
+ def copy(store, other_store, hash) do
+ page = SData.PageStore.get(other_store, hash)
+ refs = SData.Page.refs(page)
+ for ref <- refs do
+ copy(store, other_store, ref)
+ end
+ GenServer.call(store.pid, {:put, page})
+ store
+ end
+
+ def free(store, _hash) do
+ store ## DO SOMETHING???
+ end
+ end
+end
diff --git a/lib/app/blockstore.ex_ b/lib/app/blockstore.ex_
deleted file mode 100644
index 2854161..0000000
--- a/lib/app/blockstore.ex_
+++ /dev/null
@@ -1,78 +0,0 @@
-defmodule SApp.BlockStore do
- @moduledoc """
- A module that implements a content-adressable storage (blocks, or pages,
- identified by the hash of their contents).
-
- Establishes full node connectivity and uses rendez-vous hashing to select
- which nodes are responsible of a given hash.
-
- TODO: WIP
- """
-
- use GenServer
-
- defmodule State do
- defstruct [:name, :id, :manifest,
- :ncopies,
- :store, :peers]
- end
-
-
- def start_link(name) do
- GenServer.start_link(__MODULE__, name)
- end
-
- def init(name) do
- manifest = {:blockstore, name}
- id = SData.term_hash manifest
-
- GenServer.cast(Shard.Manager, {:register, id, self()})
- GenServer.cast(self(), :init_pull)
-
- {:ok, %State{name: name, id: id, manifest: manifest,
- ncopies: 3,
- store: %{}, peers: %{}}}
- end
-
- def handle_call(:manifest, _from, state) do
- {:reply, state.manifest, state}
- end
-
- def handle_call({:get, key}, from, state) do
- # TODO
- end
-
- def handle_call({:put, val}, state) do
- # TODO
- end
-
- def handle_cast({:redundant, _}, _state) do
- exit :normal
- end
-
- def handle_cast(:init_pull, state) do
- GenServer.call(SNet.Manager, :get_all)
- |> Enum.each(&(GenServer.cast(&1, {:send_msg, {:interested, [state.id]}})))
- {:noreply, state}
- end
-
- def handle_cast({:interested, peer_id, peer_pid}, state) do
- new_peers = Map.put(state.peers, peer_id, peer_pid)
- new_state = %{ state | peers: new_peers }
- initial_sync(new_state, peer_id, peer_pid)
- {:noreply, new_state}
- end
-
- def handle_cast({:msg, peer_id, peer_pid, msg}, state) do
- # TODO
- {:noreply, state}
- end
-
- defp initial_sync(state, peer_id, peer_pid) do
- # TODO
- end
-
- defp send(state, to, msg) do
- GenServer.cast(to, {:send_msg, {state.id, msg}})
- end
-end
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
index e28e896..051fab6 100644
--- a/lib/app/chat.ex
+++ b/lib/app/chat.ex
@@ -19,7 +19,8 @@ defmodule SApp.Chat do
use GenServer
- alias SData.MerkleList, as: ML
+ require Logger
+ alias SData.MerkleSearchTree, as: MST
@doc """
Start a process that connects to a given channel
@@ -32,19 +33,22 @@ defmodule SApp.Chat do
Initialize channel process.
"""
def init(channel) do
- store = ML.new(&msg_cmp/2)
manifest = {:chat, channel}
id = SData.term_hash manifest
case Shard.Manager.register(id, manifest, self()) do
:ok ->
Shard.Manager.dispatch_to(id, nil, self())
+ {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store)
+ mst = %MST{store: %SApp.BlockStore{pid: block_store},
+ cmp: &msg_cmp/2}
GenServer.cast(self(), :init_pull)
{:ok,
%{channel: channel,
id: id,
manifest: manifest,
- store: store,
+ block_store: block_store,
+ mst: mst,
subs: MapSet.new,
}
}
@@ -60,8 +64,8 @@ defmodule SApp.Chat do
{:reply, state.manifest, state}
end
- def handle_call({:read_history, start, num}, _from, state) do
- ret = ML.read(state.store, start, num)
+ def handle_call({:read_history, top_bound, num}, _from, state) do
+ ret = MST.last(state.mst, top_bound, num)
{:reply, ret, state}
end
@@ -86,7 +90,9 @@ defmodule SApp.Chat do
msgitem = {(System.os_time :seconds),
Shard.Identity.get_nickname(),
msg}
- new_state = %{state | store: ML.insert(state.store, msgitem)}
+ prev_root = state.mst.root
+ mst = MST.insert(state.mst, msgitem)
+ state = %{state | mst: mst}
for pid <- state.subs do
if Process.alive?(pid) do
@@ -94,11 +100,12 @@ defmodule SApp.Chat do
end
end
+ notif = {:append, prev_root, msgitem, mst.root}
for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do
- push_messages(new_state, peer_id, nil, 5)
+ Shard.Manager.send(peer_id, {state.id, nil, notif})
end
- {:noreply, new_state}
+ {:noreply, state}
end
@doc """
@@ -106,7 +113,7 @@ defmodule SApp.Chat do
connected to asks to recieve data for this channel.
"""
def handle_cast({:interested, peer_id}, state) do
- push_messages(state, peer_id, nil, 10)
+ Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}})
{:noreply, state}
end
@@ -126,27 +133,52 @@ defmodule SApp.Chat do
Merkle hash of the store of older messages.
"""
def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do
- case msg do
+ state = case msg do
{:get_manifest} ->
Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}})
- {:get, start} -> push_messages(state, peer_id, start, 20)
- {:info, _start, list, rest} ->
- if rest != nil and not ML.has(state.store, rest) do
- Shard.Manager.send(peer_id, {state.id, nil, {:get, rest}})
+ state
+ {:append, prev_root, msgitem, new_root} ->
+ # Append message: one single mesage has arrived
+ if new_root == state.mst.root do
+ # We already have the message, do nothing
+ state
+ else
+ # Try adding the message
+ if prev_root == state.mst.root do
+ mst2 = MST.insert(state.mst, msgitem)
+ if mst2.root == new_root do
+ # This was the only message missing, we are happy!
+ state = %{state | mst: mst2}
+ msg_callback(state, msgitem)
+ state
+ else
+ # More messages are missing, start a full merge
+ init_merge(state, new_root, peer_id)
+ end
+ else
+ init_merge(state, new_root, peer_id)
+ end
+ end
+ {:root, new_root} ->
+ if new_root == state.mst.root do
+ # already up to date, ignore
+ state
+ else
+ init_merge(state, new_root, peer_id)
end
- who = self()
- spawn_link(fn ->
- Process.sleep 1000
- GenServer.cast(who, {:deferred_insert, list})
- end)
- _ -> nil
+ x ->
+ Logger.info("Unhandled message: #{inspect x}")
+ state
end
{:noreply, state}
end
- def handle_cast({:deferred_insert, list}, state) do
- new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state, msg) end))
- {:noreply, %{state | store: new_store}}
+ defp init_merge(state, new_root, source_peer) do
+ # TODO: make the merge asynchronous
+ mgmst = %{state.mst | root: new_root}
+ mgmst = put_in(mgmst.store.prefer_ask, [source_peer])
+ mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end)
+ %{state | mst: mst}
end
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
@@ -154,14 +186,6 @@ defmodule SApp.Chat do
{:noreply, %{ state | subs: new_subs }}
end
- defp push_messages(state, to, start, num) do
- case ML.read(state.store, start, num) do
- {:ok, list, rest} ->
- Shard.Manager.send(to, {state.id, nil, {:info, start, list, rest}})
- _ -> nil
- end
- end
-
defp msg_callback(state, {ts, nick, msg}) do
for pid <- state.subs do
if Process.alive?(pid) do
diff --git a/lib/cli/cli.ex b/lib/cli/cli.ex
index 8928040..2fbf8c2 100644
--- a/lib/cli/cli.ex
+++ b/lib/cli/cli.ex
@@ -64,15 +64,10 @@ defmodule SCLI do
end
defp handle_command(pid, ["hist"]) do
- case GenServer.call(pid, {:read_history, nil, 100}) do
- {:ok, list, _rest} ->
- list
- |> Enum.reverse
- |> Enum.each(fn {ts, nick, msg} ->
- IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}"
+ GenServer.call(pid, {:read_history, nil, 25})
+ |> Enum.each(fn {{ts, nick, msg}, true} ->
+ IO.puts "#{ts |> DateTime.from_unix! |> DateTime.to_iso8601} <#{nick}> #{msg}"
end)
- _ -> nil
- end
pid
end
diff --git a/lib/data/merklesearchtree.ex b/lib/data/merklesearchtree.ex
index 0554f82..b751a08 100644
--- a/lib/data/merklesearchtree.ex
+++ b/lib/data/merklesearchtree.ex
@@ -48,17 +48,15 @@ defmodule SData.MerkleSearchTree do
end
-
+ @doc"""
+ Insert an item into the search tree.
+ """
def insert(state, key, value \\ true) do
level = calc_level(key)
{hash, store} = insert_at(state, state.store, state.root, key, level, value)
%{ state | root: hash, store: store }
end
- def remove(_state, _key) do
- # TODO
- end
-
defp insert_at(s, store, root, key, level, value) do
{new_page, store} = if root == nil do
{ %Page{ level: level, low: nil, list: [ { key, value, nil } ] }, store }
@@ -190,34 +188,118 @@ defmodule SData.MerkleSearchTree do
end
end
- def get(state, key) do
- get(state, state.store, state.root, key)
+ @doc"""
+ Merge values from another MST in this MST
+ """
+ def merge(to, from, callback \\ fn _, _ -> nil end) do
+ { store, root } = merge_aux(to, from, to.store, to.root, from.root, callback)
+ %{ to | store: store, root: root }
+ end
+
+ defp merge_aux(s1, s2, store, r1, r2, callback) do
+ case {r1, r2} do
+ {_, nil} ->
+ { store, r1 }
+ {nil, _} ->
+ store = Store.copy(store, s2.store, r2)
+ rec_callback(store, r2, callback)
+ { store, r2 }
+ _ ->
+ IO.puts("not implemented: complex merge step")
+ #TODO
+ { store, r1 }
+ end
end
- defp get(s, store, root, key) do
+ defp rec_callback(store, root, callback) do
case root do
nil -> nil
_ ->
%Page{ level: _, low: low, list: lst } = Store.get(store, root)
- get_aux(s, store, low, lst, key)
+ rec_callback(store, low, callback)
+ for {k, v, rst} <- lst do
+ callback.(k, v)
+ rec_callback(store, rst, callback)
+ end
+ end
+ end
+
+ @doc"""
+ Get value for a specific key in search tree.
+ """
+ def get(state, key) do
+ get(state, state.root, key)
+ end
+
+ defp get(s, root, key) do
+ case root do
+ nil -> nil
+ _ ->
+ %Page{ level: _, low: low, list: lst } = Store.get(s.store, root)
+ get_aux(s, low, lst, key)
end
end
- defp get_aux(s, store, low, lst, key) do
+ defp get_aux(s, low, lst, key) do
case lst do
[] ->
- get(s, store, low, key)
+ get(s, low, key)
[ {k, v, low2} | rst ] ->
case s.cmp.(key, k) do
:duplicate -> v
:before ->
- get(s, store, low, key)
+ get(s, low, key)
:after ->
- get_aux(s, store, low2, rst, key)
+ get_aux(s, low2, rst, key)
end
end
end
+ @doc"""
+ Get the last n items of the tree, or the last n items
+ strictly before given upper bound if non nil
+ """
+ def last(state, top_bound, num) do
+ last(state, state.root, top_bound, num)
+ end
+
+ defp last(s, root, top_bound, num) do
+ case root do
+ nil -> []
+ _ ->
+ %Page{ level: _, low: low, list: lst } = Store.get(s.store, root)
+ last_aux(s, low, lst, top_bound, num)
+ end
+ end
+
+ defp last_aux(s, low, lst, top_bound, num) do
+ case lst do
+ [] ->
+ last(s, low, top_bound, num)
+ [ {k, v, low2} | rst ] ->
+ if top_bound == nil or s.cmp.(top_bound, k) == :after do
+ items = last_aux(s, low2, rst, top_bound, num)
+ items = if Enum.count(items) < num do
+ [ {k, v} | items ]
+ else
+ items
+ end
+ cnt = Enum.count items
+ if cnt < num do
+ last(s, low, top_bound, num - cnt) ++ items
+ else
+ items
+ end
+ else
+ last(s, low, top_bound, num)
+ end
+ end
+ end
+
+
+ @doc"""
+ Dump Merkle search tree structure.
+ """
def dump(state) do
dump(state.store, state.root, "")
end
@@ -251,5 +333,4 @@ defmodule SData.MerkleSearchTree do
defp count_leading_zeroes(_) do
0
end
-
end
diff --git a/test/mst_test.exs b/test/mst_test.exs
index 73b4f63..7fe340e 100644
--- a/test/mst_test.exs
+++ b/test/mst_test.exs
@@ -107,5 +107,25 @@ defmodule ShardTest.MST do
IO.puts "y.root: #{y.root|>Base.encode16}"
IO.puts "z.root: #{z.root|>Base.encode16}"
assert y.root == z.root
+
+ MST.last(y, nil, 10)
+ end
+
+ test "merkle search tree 5" do
+ y = Enum.reduce(0..1000, %MST{},
+ fn i, acc -> MST.insert(acc, i) end)
+
+ assert(MST.last(y, nil, 2) == [{999, true}, {1000, true}])
+ assert(MST.last(y, 42, 2) == [{40, true}, {41, true}])
+
+ stuff = for i <- 100..199, do: {i, true}
+ assert MST.last(y, 200, 100) == stuff
+
+ stuff = for i <- 200..299, do: {i, true}
+ assert MST.last(y, 300, 100) == stuff
+
+ stuff = for i <- 200..499, do: {i, true}
+ assert MST.last(y, 500, 300) == stuff
end
+
end