aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/app
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/app')
-rw-r--r--shard/lib/app/blockstore.ex149
-rw-r--r--shard/lib/app/chat.ex208
2 files changed, 357 insertions, 0 deletions
diff --git a/shard/lib/app/blockstore.ex b/shard/lib/app/blockstore.ex
new file mode 100644
index 0000000..8e4fddc
--- /dev/null
+++ b/shard/lib/app/blockstore.ex
@@ -0,0 +1,149 @@
+defmodule SApp.BlockStore do
+ @moduledoc """
+ A module that implements a content-adressable storage (blocks, or pages,
+ identified by the hash of their contents).
+
+ This is not a shard, it is a side process that a shard may use to store its data.
+
+ TODO: WIP
+ """
+
+ use GenServer
+
+ @enforce_keys [:pid]
+ defstruct [:pid, :prefer_ask]
+
+
+ defmodule State do
+ defstruct [:shard_id, :path, :store, :reqs, :retries]
+ end
+
+
+ def start_link(shard_id, path) do
+ GenServer.start_link(__MODULE__, [shard_id, path])
+ end
+
+ def init([shard_id, path]) do
+ Shard.Manager.dispatch_to(shard_id, path, self())
+
+ {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}}
+ end
+
+ def handle_call({:get, key, prefer_ask}, from, state) do
+ case state.store[key] do
+ nil ->
+ case prefer_ask do
+ [_ | _] ->
+ for peer <- prefer_ask do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ _ ->
+ ask_random_peers(state, key)
+ end
+ reqs_key = case state.reqs[key] do
+ nil ->
+ MapSet.put(MapSet.new(), from)
+ ms ->
+ MapSet.put(ms, from)
+ end
+ state = put_in(state.reqs[key], reqs_key)
+ {:noreply, state}
+ v ->
+ {:reply, v, state}
+ end
+ end
+
+ def handle_call({:put, val}, _from, state) do
+ hash = SData.term_hash val
+ state = %{state | store: Map.put(state.store, hash, val)}
+ {:reply, hash, state}
+ end
+
+ def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
+ state = case msg do
+ {:get, key} ->
+ case state.store[key] do
+ nil ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
+ v ->
+ Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
+ end
+ state
+ {:info, hash, value} ->
+ if SData.term_hash value == hash do
+ reqs = case state.reqs[hash] do
+ nil -> state.reqs
+ pids ->
+ for pid <- pids do
+ GenServer.reply(pid, value)
+ end
+ Map.delete(state.reqs, hash)
+ end
+ state = %{state | retries: Map.delete(state.retries, hash)}
+ %{state | store: Map.put(state.store, hash, value), reqs: reqs}
+ else
+ state
+ end
+ {:not_found, key} ->
+ if state.reqs[key] != nil and state.store[key] == nil do
+ nretry = case state.retries[key] do
+ nil -> 1
+ n -> n+1
+ end
+ if nretry < 3 do
+ ask_random_peers(state, key)
+ %{state | retries: Map.put(state.retries, key, nretry)}
+ else
+ for pid <- state.reqs[key] do
+ GenServer.reply(pid, nil)
+ end
+ state = %{state | reqs: Map.delete(state.reqs, key)}
+ state = %{state | retries: Map.delete(state.retries, key)}
+ state
+ end
+ else
+ state
+ end
+ end
+ {:noreply, state}
+ end
+
+ def ask_random_peers(state, key) do
+ peers = :ets.lookup(:shard_peer_db, state.shard_id)
+ |> Enum.shuffle
+ |> Enum.take(3)
+ for {_, peer} <- peers do
+ Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
+ end
+ end
+
+
+ defimpl SData.PageStore do
+ def put(store, page) do
+ hash = GenServer.call(store.pid, {:put, page})
+ { hash, store }
+ end
+
+ def get(store, hash) do
+ try do
+ GenServer.call(store.pid, {:get, hash, store.prefer_ask})
+ catch
+ :exit, {:timeout, _} -> nil
+ end
+ end
+
+ def copy(store, other_store, hash) do
+ page = SData.PageStore.get(other_store, hash)
+ refs = SData.Page.refs(page)
+ for ref <- refs do
+ copy(store, other_store, ref)
+ end
+ GenServer.call(store.pid, {:put, page})
+ store
+ end
+
+ def free(store, _hash) do
+ store ## DO SOMETHING???
+ end
+ end
+end
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
new file mode 100644
index 0000000..051fab6
--- /dev/null
+++ b/shard/lib/app/chat.ex
@@ -0,0 +1,208 @@
+defmodule SApp.Chat do
+ @moduledoc """
+ Shard application for a replicated chat room with full history.
+
+ Chat rooms are globally identified by their channel name.
+ A chat room manifest is of the form:
+
+ {:chat, channel_name}
+
+ Future improvements:
+ - message signing
+ - storage of the chatroom messages to disk
+ - storage of the known peers that have this channel to disk
+ - use a DHT to find peers that are interested in this channel
+ - epidemic broadcast (carefull not to be too costly,
+ maybe by limiting the number of peers we talk to)
+ - partial synchronization only == data distributed over peers
+ """
+
+ use GenServer
+
+ require Logger
+ alias SData.MerkleSearchTree, as: MST
+
+ @doc """
+ Start a process that connects to a given channel
+ """
+ def start_link(channel) do
+ GenServer.start_link(__MODULE__, channel)
+ end
+
+ @doc """
+ Initialize channel process.
+ """
+ def init(channel) do
+ manifest = {:chat, channel}
+ id = SData.term_hash manifest
+
+ case Shard.Manager.register(id, manifest, self()) do
+ :ok ->
+ Shard.Manager.dispatch_to(id, nil, self())
+ {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store)
+ mst = %MST{store: %SApp.BlockStore{pid: block_store},
+ cmp: &msg_cmp/2}
+ GenServer.cast(self(), :init_pull)
+ {:ok,
+ %{channel: channel,
+ id: id,
+ manifest: manifest,
+ block_store: block_store,
+ mst: mst,
+ subs: MapSet.new,
+ }
+ }
+ :redundant ->
+ exit(:redundant)
+ end
+ end
+
+ @doc """
+ Implementation of the :manifest call that returns the chat room's manifest
+ """
+ def handle_call(:manifest, _from, state) do
+ {:reply, state.manifest, state}
+ end
+
+ def handle_call({:read_history, top_bound, num}, _from, state) do
+ ret = MST.last(state.mst, top_bound, num)
+ {:reply, ret, state}
+ end
+
+ @doc """
+ Implementation of the :init_pull handler, which is called when the
+ process starts. It contacts all currently connected peers and asks them to
+ send data for this channel if they have some.
+ """
+ def handle_cast(:init_pull, state) do
+ for {_, pid, _, _} <- :ets.tab2list(:peer_db) do
+ GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
+ end
+ {:noreply, state}
+ end
+
+ @doc """
+ Implementation of the :chat_send handler. This is the main handler that is used
+ to send a message to the chat room. Puts the message in the store and syncs
+ with all connected peers.
+ """
+ def handle_cast({:chat_send, msg}, state) do
+ msgitem = {(System.os_time :seconds),
+ Shard.Identity.get_nickname(),
+ msg}
+ prev_root = state.mst.root
+ mst = MST.insert(state.mst, msgitem)
+ state = %{state | mst: mst}
+
+ for pid <- state.subs do
+ if Process.alive?(pid) do
+ send(pid, {:chat_send, state.channel, msgitem})
+ end
+ end
+
+ notif = {:append, prev_root, msgitem, mst.root}
+ for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do
+ Shard.Manager.send(peer_id, {state.id, nil, notif})
+ end
+
+ {:noreply, state}
+ end
+
+ @doc """
+ Implementation of the :interested handler, this is called when a peer we are
+ connected to asks to recieve data for this channel.
+ """
+ def handle_cast({:interested, peer_id}, state) do
+ Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}})
+ {:noreply, state}
+ end
+
+ def handle_cast({:subscribe, pid}, state) do
+ Process.monitor(pid)
+ new_subs = MapSet.put(state.subs, pid)
+ {:noreply, %{ state | subs: new_subs }}
+ end
+
+ @doc """
+ Implementation of the :msg handler, which is the main handler for messages
+ comming from other peers concerning this chat room.
+
+ Messages are:
+ - `{:get, start}`: get some messages starting at a given Merkle hash
+ - `{:info, start, list, rest}`: put some messages and informs of the
+ Merkle hash of the store of older messages.
+ """
+ def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do
+ state = case msg do
+ {:get_manifest} ->
+ Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}})
+ state
+ {:append, prev_root, msgitem, new_root} ->
+ # Append message: one single mesage has arrived
+ if new_root == state.mst.root do
+ # We already have the message, do nothing
+ state
+ else
+ # Try adding the message
+ if prev_root == state.mst.root do
+ mst2 = MST.insert(state.mst, msgitem)
+ if mst2.root == new_root do
+ # This was the only message missing, we are happy!
+ state = %{state | mst: mst2}
+ msg_callback(state, msgitem)
+ state
+ else
+ # More messages are missing, start a full merge
+ init_merge(state, new_root, peer_id)
+ end
+ else
+ init_merge(state, new_root, peer_id)
+ end
+ end
+ {:root, new_root} ->
+ if new_root == state.mst.root do
+ # already up to date, ignore
+ state
+ else
+ init_merge(state, new_root, peer_id)
+ end
+ x ->
+ Logger.info("Unhandled message: #{inspect x}")
+ state
+ end
+ {:noreply, state}
+ end
+
+ defp init_merge(state, new_root, source_peer) do
+ # TODO: make the merge asynchronous
+ mgmst = %{state.mst | root: new_root}
+ mgmst = put_in(mgmst.store.prefer_ask, [source_peer])
+ mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end)
+ %{state | mst: mst}
+ end
+
+ def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
+ new_subs = MapSet.delete(state.subs, pid)
+ {:noreply, %{ state | subs: new_subs }}
+ end
+
+ defp msg_callback(state, {ts, nick, msg}) do
+ for pid <- state.subs do
+ if Process.alive?(pid) do
+ send(pid, {:chat_recv, state.channel, {ts, nick, msg}})
+ end
+ end
+ end
+
+ defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
+ cond do
+ ts1 > ts2 -> :after
+ ts1 < ts2 -> :before
+ nick1 > nick2 -> :after
+ nick1 < nick2 -> :before
+ msg1 > msg2 -> :after
+ msg1 < msg2 -> :before
+ true -> :duplicate
+ end
+ end
+end