aboutsummaryrefslogtreecommitdiff
path: root/lib/app
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-09-01 16:06:23 +0200
committerAlex Auvolat <alex@adnab.me>2018-09-01 16:06:23 +0200
commitc6ec33d6e612168e14d77007915a4ea423c55a2e (patch)
tree8b5645651a0cc991b8ac9c68c388d84c8dbe73d2 /lib/app
parent1a0ef154a421af60f6d57dfe861dacb844a7d142 (diff)
downloadshard-c6ec33d6e612168e14d77007915a4ea423c55a2e.tar.gz
shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.zip
Move everything to subdirectory
Diffstat (limited to 'lib/app')
-rw-r--r--lib/app/blockstore.ex149
-rw-r--r--lib/app/chat.ex208
2 files changed, 0 insertions, 357 deletions
diff --git a/lib/app/blockstore.ex b/lib/app/blockstore.ex
deleted file mode 100644
index 8e4fddc..0000000
--- a/lib/app/blockstore.ex
+++ /dev/null
@@ -1,149 +0,0 @@
-defmodule SApp.BlockStore do
- @moduledoc """
- A module that implements a content-adressable storage (blocks, or pages,
- identified by the hash of their contents).
-
- This is not a shard, it is a side process that a shard may use to store its data.
-
- TODO: WIP
- """
-
- use GenServer
-
- @enforce_keys [:pid]
- defstruct [:pid, :prefer_ask]
-
-
- defmodule State do
- defstruct [:shard_id, :path, :store, :reqs, :retries]
- end
-
-
- def start_link(shard_id, path) do
- GenServer.start_link(__MODULE__, [shard_id, path])
- end
-
- def init([shard_id, path]) do
- Shard.Manager.dispatch_to(shard_id, path, self())
-
- {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}}
- end
-
- def handle_call({:get, key, prefer_ask}, from, state) do
- case state.store[key] do
- nil ->
- case prefer_ask do
- [_ | _] ->
- for peer <- prefer_ask do
- Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
- end
- _ ->
- ask_random_peers(state, key)
- end
- reqs_key = case state.reqs[key] do
- nil ->
- MapSet.put(MapSet.new(), from)
- ms ->
- MapSet.put(ms, from)
- end
- state = put_in(state.reqs[key], reqs_key)
- {:noreply, state}
- v ->
- {:reply, v, state}
- end
- end
-
- def handle_call({:put, val}, _from, state) do
- hash = SData.term_hash val
- state = %{state | store: Map.put(state.store, hash, val)}
- {:reply, hash, state}
- end
-
- def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
- state = case msg do
- {:get, key} ->
- case state.store[key] do
- nil ->
- Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
- v ->
- Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
- end
- state
- {:info, hash, value} ->
- if SData.term_hash value == hash do
- reqs = case state.reqs[hash] do
- nil -> state.reqs
- pids ->
- for pid <- pids do
- GenServer.reply(pid, value)
- end
- Map.delete(state.reqs, hash)
- end
- state = %{state | retries: Map.delete(state.retries, hash)}
- %{state | store: Map.put(state.store, hash, value), reqs: reqs}
- else
- state
- end
- {:not_found, key} ->
- if state.reqs[key] != nil and state.store[key] == nil do
- nretry = case state.retries[key] do
- nil -> 1
- n -> n+1
- end
- if nretry < 3 do
- ask_random_peers(state, key)
- %{state | retries: Map.put(state.retries, key, nretry)}
- else
- for pid <- state.reqs[key] do
- GenServer.reply(pid, nil)
- end
- state = %{state | reqs: Map.delete(state.reqs, key)}
- state = %{state | retries: Map.delete(state.retries, key)}
- state
- end
- else
- state
- end
- end
- {:noreply, state}
- end
-
- def ask_random_peers(state, key) do
- peers = :ets.lookup(:shard_peer_db, state.shard_id)
- |> Enum.shuffle
- |> Enum.take(3)
- for {_, peer} <- peers do
- Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
- end
- end
-
-
- defimpl SData.PageStore do
- def put(store, page) do
- hash = GenServer.call(store.pid, {:put, page})
- { hash, store }
- end
-
- def get(store, hash) do
- try do
- GenServer.call(store.pid, {:get, hash, store.prefer_ask})
- catch
- :exit, {:timeout, _} -> nil
- end
- end
-
- def copy(store, other_store, hash) do
- page = SData.PageStore.get(other_store, hash)
- refs = SData.Page.refs(page)
- for ref <- refs do
- copy(store, other_store, ref)
- end
- GenServer.call(store.pid, {:put, page})
- store
- end
-
- def free(store, _hash) do
- store ## DO SOMETHING???
- end
- end
-end
diff --git a/lib/app/chat.ex b/lib/app/chat.ex
deleted file mode 100644
index 051fab6..0000000
--- a/lib/app/chat.ex
+++ /dev/null
@@ -1,208 +0,0 @@
-defmodule SApp.Chat do
- @moduledoc """
- Shard application for a replicated chat room with full history.
-
- Chat rooms are globally identified by their channel name.
- A chat room manifest is of the form:
-
- {:chat, channel_name}
-
- Future improvements:
- - message signing
- - storage of the chatroom messages to disk
- - storage of the known peers that have this channel to disk
- - use a DHT to find peers that are interested in this channel
- - epidemic broadcast (carefull not to be too costly,
- maybe by limiting the number of peers we talk to)
- - partial synchronization only == data distributed over peers
- """
-
- use GenServer
-
- require Logger
- alias SData.MerkleSearchTree, as: MST
-
- @doc """
- Start a process that connects to a given channel
- """
- def start_link(channel) do
- GenServer.start_link(__MODULE__, channel)
- end
-
- @doc """
- Initialize channel process.
- """
- def init(channel) do
- manifest = {:chat, channel}
- id = SData.term_hash manifest
-
- case Shard.Manager.register(id, manifest, self()) do
- :ok ->
- Shard.Manager.dispatch_to(id, nil, self())
- {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store)
- mst = %MST{store: %SApp.BlockStore{pid: block_store},
- cmp: &msg_cmp/2}
- GenServer.cast(self(), :init_pull)
- {:ok,
- %{channel: channel,
- id: id,
- manifest: manifest,
- block_store: block_store,
- mst: mst,
- subs: MapSet.new,
- }
- }
- :redundant ->
- exit(:redundant)
- end
- end
-
- @doc """
- Implementation of the :manifest call that returns the chat room's manifest
- """
- def handle_call(:manifest, _from, state) do
- {:reply, state.manifest, state}
- end
-
- def handle_call({:read_history, top_bound, num}, _from, state) do
- ret = MST.last(state.mst, top_bound, num)
- {:reply, ret, state}
- end
-
- @doc """
- Implementation of the :init_pull handler, which is called when the
- process starts. It contacts all currently connected peers and asks them to
- send data for this channel if they have some.
- """
- def handle_cast(:init_pull, state) do
- for {_, pid, _, _} <- :ets.tab2list(:peer_db) do
- GenServer.cast(pid, {:send_msg, {:interested, [state.id]}})
- end
- {:noreply, state}
- end
-
- @doc """
- Implementation of the :chat_send handler. This is the main handler that is used
- to send a message to the chat room. Puts the message in the store and syncs
- with all connected peers.
- """
- def handle_cast({:chat_send, msg}, state) do
- msgitem = {(System.os_time :seconds),
- Shard.Identity.get_nickname(),
- msg}
- prev_root = state.mst.root
- mst = MST.insert(state.mst, msgitem)
- state = %{state | mst: mst}
-
- for pid <- state.subs do
- if Process.alive?(pid) do
- send(pid, {:chat_send, state.channel, msgitem})
- end
- end
-
- notif = {:append, prev_root, msgitem, mst.root}
- for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do
- Shard.Manager.send(peer_id, {state.id, nil, notif})
- end
-
- {:noreply, state}
- end
-
- @doc """
- Implementation of the :interested handler, this is called when a peer we are
- connected to asks to recieve data for this channel.
- """
- def handle_cast({:interested, peer_id}, state) do
- Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}})
- {:noreply, state}
- end
-
- def handle_cast({:subscribe, pid}, state) do
- Process.monitor(pid)
- new_subs = MapSet.put(state.subs, pid)
- {:noreply, %{ state | subs: new_subs }}
- end
-
- @doc """
- Implementation of the :msg handler, which is the main handler for messages
- comming from other peers concerning this chat room.
-
- Messages are:
- - `{:get, start}`: get some messages starting at a given Merkle hash
- - `{:info, start, list, rest}`: put some messages and informs of the
- Merkle hash of the store of older messages.
- """
- def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do
- state = case msg do
- {:get_manifest} ->
- Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}})
- state
- {:append, prev_root, msgitem, new_root} ->
- # Append message: one single mesage has arrived
- if new_root == state.mst.root do
- # We already have the message, do nothing
- state
- else
- # Try adding the message
- if prev_root == state.mst.root do
- mst2 = MST.insert(state.mst, msgitem)
- if mst2.root == new_root do
- # This was the only message missing, we are happy!
- state = %{state | mst: mst2}
- msg_callback(state, msgitem)
- state
- else
- # More messages are missing, start a full merge
- init_merge(state, new_root, peer_id)
- end
- else
- init_merge(state, new_root, peer_id)
- end
- end
- {:root, new_root} ->
- if new_root == state.mst.root do
- # already up to date, ignore
- state
- else
- init_merge(state, new_root, peer_id)
- end
- x ->
- Logger.info("Unhandled message: #{inspect x}")
- state
- end
- {:noreply, state}
- end
-
- defp init_merge(state, new_root, source_peer) do
- # TODO: make the merge asynchronous
- mgmst = %{state.mst | root: new_root}
- mgmst = put_in(mgmst.store.prefer_ask, [source_peer])
- mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end)
- %{state | mst: mst}
- end
-
- def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
- new_subs = MapSet.delete(state.subs, pid)
- {:noreply, %{ state | subs: new_subs }}
- end
-
- defp msg_callback(state, {ts, nick, msg}) do
- for pid <- state.subs do
- if Process.alive?(pid) do
- send(pid, {:chat_recv, state.channel, {ts, nick, msg}})
- end
- end
- end
-
- defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do
- cond do
- ts1 > ts2 -> :after
- ts1 < ts2 -> :before
- nick1 > nick2 -> :after
- nick1 < nick2 -> :before
- msg1 > msg2 -> :after
- msg1 < msg2 -> :before
- true -> :duplicate
- end
- end
-end