diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/app/chat.ex | 22 | ||||
-rw-r--r-- | lib/data/merklelist.ex | 117 | ||||
-rw-r--r-- | lib/manager.ex | 9 |
3 files changed, 90 insertions, 58 deletions
diff --git a/lib/app/chat.ex b/lib/app/chat.ex index 696e53c..b93e7b3 100644 --- a/lib/app/chat.ex +++ b/lib/app/chat.ex @@ -19,6 +19,8 @@ defmodule SApp.Chat do use GenServer + alias SData.MerkleList, as: ML + @doc """ Start a process that connects to a given channel """ @@ -30,7 +32,7 @@ defmodule SApp.Chat do Initialize channel process. """ def init(channel) do - {:ok, store} = SData.MerkleList.start_link(&msg_cmp/2) + store = ML.new(&msg_cmp/2) manifest = {:chat, channel} id = SData.term_hash manifest @@ -76,13 +78,13 @@ defmodule SApp.Chat do msgitem = {(System.os_time :seconds), Shard.Identity.get_nickname(), msg} - GenServer.cast(state.store, {:insert, msgitem}) + new_state = %{state | store: ML.insert(state.store, msgitem)} for peer <- state.peers do - push_messages(state, peer, nil, 5) + push_messages(new_state, peer, nil, 5) end - {:noreply, state} + {:noreply, new_state} end @doc """ @@ -110,12 +112,13 @@ defmodule SApp.Chat do Shard.Manager.send(peer_id, {state.id, {:manifest, state.manifest}}) {:get, start} -> push_messages(peer_id, state, start, 20) {:info, _start, list, rest} -> - if rest != nil and not GenServer.call(state.store, {:has, rest}) do + if rest != nil and not ML.has(state.store, rest) do Shard.Manager.send(peer_id, {state.id, {:get, rest}}) end + who = self() spawn_link(fn -> Process.sleep 1000 - GenServer.cast(state.store, {:insert_many, list, (fn msg -> msg_callback(state.channel, msg) end)}) + GenServer.cast(who, {:deferred_insert, list}) end) _ -> nil end @@ -127,8 +130,13 @@ defmodule SApp.Chat do end end + def handle_cast({:deferred_insert, list}, state) do + new_store = ML.insert_many(state.store, list, (fn msg -> msg_callback(state.channel, msg) end)) + %{state | store: new_store} + end + defp push_messages(state, to, start, num) do - case GenServer.call(state.store, {:read, start, num}) do + case ML.read(state.store, start, num) do {:ok, list, rest} -> Shard.Manager.send(to, {state.id, {:info, start, list, rest}}) _ -> nil diff --git a/lib/data/merklelist.ex b/lib/data/merklelist.ex index 727f2a8..71483bd 100644 --- a/lib/data/merklelist.ex +++ b/lib/data/merklelist.ex @@ -1,50 +1,45 @@ defmodule SData.MerkleList do - @moduledoc """ + @moduledoc""" A simple Merkle list store. Future improvements: - - When messages are inserted other than at the top, all intermediate hashes - change. Keep track of the mapping from old hashes to new hashes so that get - requests can work even for hashes that are not valid anymore. - - group items in "pages" (bigger bundles) + - When messages are inserted other than at the top, all intermediate hashes + change. Keep track of the mapping from old hashes to new hashes so that get + requests can work even for hashes that are not valid anymore. + - group items in "pages" (bigger bundles) """ - use GenServer + defstruct [:root, :top, :cmp, :store] - @doc """ - Start a Merkle List storage process. + @doc""" + Create a Merkle list store. - - `cmp` is a function that compares stored items and provides a total order. - It must return: - - `:after` if the first argument is more recent - - `:duplicate` if the two items are the same - - `:before` if the first argument is older + `cmp` is a function that compares stored items and provides a total order. + It must return: + - `:after` if the first argument is more recent + - `:duplicate` if the two items are the same + - `:before` if the first argument is older """ - def start_link(cmp) do - GenServer.start_link(__MODULE__, cmp) - end - - def init(cmp) do + def new(cmp) do root_item = :root root_hash = SData.term_hash root_item - state = %{ + state = %SData.MerkleList{ root: root_hash, top: root_hash, cmp: cmp, store: %{ root_hash => root_item } } - {:ok, state} + state end - defp state_push(item, state) do + defp push(state, item) do new_item = {item, state.top} new_item_hash = SData.term_hash new_item new_store = Map.put(state.store, new_item_hash, new_item) %{ state | :top => new_item_hash, :store => new_store } end - defp state_pop(state) do + defp pop(state) do if state.top == state.root do :error else @@ -55,59 +50,81 @@ defmodule SData.MerkleList do end end - defp insert_many(state, [], _callback) do + @doc""" + Insert a list of items in the store. + + A callback function may be specified that is called on any item + that is sucessfully added, i.e. that wasn't present in the store before. + """ + def insert_many(state, items, callback \\ (fn _ -> nil end)) do + items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end) + insert_many_aux(state, items_sorted, callback) + end + + defp insert_many_aux(state, [], _callback) do state end - defp insert_many(state, [item | rest], callback) do - case state_pop(state) do + defp insert_many_aux(state, [item | rest], callback) do + case pop(state) do :error -> - new_state = state_push(item, insert_many(state, rest, callback)) + new_state = push(insert_many_aux(state, rest, callback), item) callback.(item) new_state {:ok, front, state_rest} -> case state.cmp.(item, front) do :after -> - new_state = state_push(item, insert_many(state, rest, callback)) + new_state = push(insert_many_aux(state, rest, callback), item) callback.(item) new_state - :duplicate -> insert_many(state, rest, callback) - :before -> state_push(front, insert_many(state_rest, [item | rest], callback)) + :duplicate -> insert_many_aux(state, rest, callback) + :before -> push(insert_many_aux(state_rest, [item | rest], callback), item) end end end - def handle_cast({:insert, item}, state) do - handle_cast({:insert_many, [item]}, state) - end + @doc""" + Insert a single item in the store. - def handle_cast({:insert_many, items}, state) do - handle_cast({:insert_many, items, fn _ -> nil end}, state) + A callback function may be specified that is called on the item + if it is sucessfully added, i.e. it wasn't present in the store before. + """ + def insert(state, item, callback \\ (fn _ -> nil end)) do + insert_many(state, [item], callback) end - def handle_cast({:insert_many, items, callback}, state) do - items_sorted = Enum.sort(items, fn (x, y) -> state.cmp.(x, y) == :after end) - new_state = insert_many(state, items_sorted, callback) - {:noreply, new_state} - end + @doc""" + Read some items from the state. - def handle_call({:read, qbegin, qlimit}, _from, state) do + The two parameters are optional: + - qbegin : hash of the first item to read + - qlimit : number of items to read + """ + def read(state, qbegin \\ nil, qlimit \\ nil) do begin = qbegin || state.top limit = qlimit || 20 - items = get_items_list(state, begin, limit) - {:reply, items, state} + get_items_list(state, begin, limit) end - def handle_call(:top, _from, state) do - {:reply, state.top, state} + @doc""" + Get the hash of the last item + """ + def top(state) do + state.top end - def handle_call(:root, _from, state) do - {:reply, state.root, state} + @doc""" + Get the hash of the root item + """ + def root(state) do + state.root end - def handle_call({:has, hash}, _from, state) do - {:reply, Map.has_key?(state.store, hash), state} + @doc""" + Check if the store holds a certain item + """ + def has(state, hash) do + Map.has_key?(state.store, hash) end defp get_items_list(state, begin, limit) do @@ -128,7 +145,7 @@ defmodule SData.MerkleList do end end - @doc """ + @doc""" Compare function for timestamped strings """ def cmp_ts_str({ts1, str1}, {ts2, str2}) do diff --git a/lib/manager.ex b/lib/manager.ex index 07295fa..45aae5f 100644 --- a/lib/manager.ex +++ b/lib/manager.ex @@ -1,6 +1,6 @@ defmodule Shard.Manager do @moduledoc""" - Maintains three tables : + Maintains two important tables : - :peer_db @@ -12,11 +12,18 @@ defmodule Shard.Manager do List of { id, manifest, shard_pid } + And also some others : + - :peer_shard_db Mult-list of { peer_id, shard_id } + - :outbox + + Multi-list of + { peer_id, message } + """ use GenServer |