diff options
author | Alex Auvolat <alex@adnab.me> | 2018-09-01 16:06:23 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2018-09-01 16:06:23 +0200 |
commit | c6ec33d6e612168e14d77007915a4ea423c55a2e (patch) | |
tree | 8b5645651a0cc991b8ac9c68c388d84c8dbe73d2 /lib/app | |
parent | 1a0ef154a421af60f6d57dfe861dacb844a7d142 (diff) | |
download | shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.tar.gz shard-c6ec33d6e612168e14d77007915a4ea423c55a2e.zip |
Move everything to subdirectory
Diffstat (limited to 'lib/app')
-rw-r--r-- | lib/app/blockstore.ex | 149 | ||||
-rw-r--r-- | lib/app/chat.ex | 208 |
2 files changed, 0 insertions, 357 deletions
diff --git a/lib/app/blockstore.ex b/lib/app/blockstore.ex deleted file mode 100644 index 8e4fddc..0000000 --- a/lib/app/blockstore.ex +++ /dev/null @@ -1,149 +0,0 @@ -defmodule SApp.BlockStore do - @moduledoc """ - A module that implements a content-adressable storage (blocks, or pages, - identified by the hash of their contents). - - This is not a shard, it is a side process that a shard may use to store its data. - - TODO: WIP - """ - - use GenServer - - @enforce_keys [:pid] - defstruct [:pid, :prefer_ask] - - - defmodule State do - defstruct [:shard_id, :path, :store, :reqs, :retries] - end - - - def start_link(shard_id, path) do - GenServer.start_link(__MODULE__, [shard_id, path]) - end - - def init([shard_id, path]) do - Shard.Manager.dispatch_to(shard_id, path, self()) - - {:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}} - end - - def handle_call({:get, key, prefer_ask}, from, state) do - case state.store[key] do - nil -> - case prefer_ask do - [_ | _] -> - for peer <- prefer_ask do - Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) - end - _ -> - ask_random_peers(state, key) - end - reqs_key = case state.reqs[key] do - nil -> - MapSet.put(MapSet.new(), from) - ms -> - MapSet.put(ms, from) - end - state = put_in(state.reqs[key], reqs_key) - {:noreply, state} - v -> - {:reply, v, state} - end - end - - def handle_call({:put, val}, _from, state) do - hash = SData.term_hash val - state = %{state | store: Map.put(state.store, hash, val)} - {:reply, hash, state} - end - - def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do - state = case msg do - {:get, key} -> - case state.store[key] do - nil -> - Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}}) - v -> - Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}}) - end - state - {:info, hash, value} -> - if SData.term_hash value == hash do - reqs = case state.reqs[hash] do - nil -> state.reqs - pids -> - for pid <- pids do - GenServer.reply(pid, value) - end - Map.delete(state.reqs, hash) - end - state = %{state | retries: Map.delete(state.retries, hash)} - %{state | store: Map.put(state.store, hash, value), reqs: reqs} - else - state - end - {:not_found, key} -> - if state.reqs[key] != nil and state.store[key] == nil do - nretry = case state.retries[key] do - nil -> 1 - n -> n+1 - end - if nretry < 3 do - ask_random_peers(state, key) - %{state | retries: Map.put(state.retries, key, nretry)} - else - for pid <- state.reqs[key] do - GenServer.reply(pid, nil) - end - state = %{state | reqs: Map.delete(state.reqs, key)} - state = %{state | retries: Map.delete(state.retries, key)} - state - end - else - state - end - end - {:noreply, state} - end - - def ask_random_peers(state, key) do - peers = :ets.lookup(:shard_peer_db, state.shard_id) - |> Enum.shuffle - |> Enum.take(3) - for {_, peer} <- peers do - Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}}) - end - end - - - defimpl SData.PageStore do - def put(store, page) do - hash = GenServer.call(store.pid, {:put, page}) - { hash, store } - end - - def get(store, hash) do - try do - GenServer.call(store.pid, {:get, hash, store.prefer_ask}) - catch - :exit, {:timeout, _} -> nil - end - end - - def copy(store, other_store, hash) do - page = SData.PageStore.get(other_store, hash) - refs = SData.Page.refs(page) - for ref <- refs do - copy(store, other_store, ref) - end - GenServer.call(store.pid, {:put, page}) - store - end - - def free(store, _hash) do - store ## DO SOMETHING??? - end - end -end diff --git a/lib/app/chat.ex b/lib/app/chat.ex deleted file mode 100644 index 051fab6..0000000 --- a/lib/app/chat.ex +++ /dev/null @@ -1,208 +0,0 @@ -defmodule SApp.Chat do - @moduledoc """ - Shard application for a replicated chat room with full history. - - Chat rooms are globally identified by their channel name. - A chat room manifest is of the form: - - {:chat, channel_name} - - Future improvements: - - message signing - - storage of the chatroom messages to disk - - storage of the known peers that have this channel to disk - - use a DHT to find peers that are interested in this channel - - epidemic broadcast (carefull not to be too costly, - maybe by limiting the number of peers we talk to) - - partial synchronization only == data distributed over peers - """ - - use GenServer - - require Logger - alias SData.MerkleSearchTree, as: MST - - @doc """ - Start a process that connects to a given channel - """ - def start_link(channel) do - GenServer.start_link(__MODULE__, channel) - end - - @doc """ - Initialize channel process. - """ - def init(channel) do - manifest = {:chat, channel} - id = SData.term_hash manifest - - case Shard.Manager.register(id, manifest, self()) do - :ok -> - Shard.Manager.dispatch_to(id, nil, self()) - {:ok, block_store} = SApp.BlockStore.start_link(id, :block_store) - mst = %MST{store: %SApp.BlockStore{pid: block_store}, - cmp: &msg_cmp/2} - GenServer.cast(self(), :init_pull) - {:ok, - %{channel: channel, - id: id, - manifest: manifest, - block_store: block_store, - mst: mst, - subs: MapSet.new, - } - } - :redundant -> - exit(:redundant) - end - end - - @doc """ - Implementation of the :manifest call that returns the chat room's manifest - """ - def handle_call(:manifest, _from, state) do - {:reply, state.manifest, state} - end - - def handle_call({:read_history, top_bound, num}, _from, state) do - ret = MST.last(state.mst, top_bound, num) - {:reply, ret, state} - end - - @doc """ - Implementation of the :init_pull handler, which is called when the - process starts. It contacts all currently connected peers and asks them to - send data for this channel if they have some. - """ - def handle_cast(:init_pull, state) do - for {_, pid, _, _} <- :ets.tab2list(:peer_db) do - GenServer.cast(pid, {:send_msg, {:interested, [state.id]}}) - end - {:noreply, state} - end - - @doc """ - Implementation of the :chat_send handler. This is the main handler that is used - to send a message to the chat room. Puts the message in the store and syncs - with all connected peers. - """ - def handle_cast({:chat_send, msg}, state) do - msgitem = {(System.os_time :seconds), - Shard.Identity.get_nickname(), - msg} - prev_root = state.mst.root - mst = MST.insert(state.mst, msgitem) - state = %{state | mst: mst} - - for pid <- state.subs do - if Process.alive?(pid) do - send(pid, {:chat_send, state.channel, msgitem}) - end - end - - notif = {:append, prev_root, msgitem, mst.root} - for {_, peer_id} <- :ets.lookup(:shard_peer_db, state.id) do - Shard.Manager.send(peer_id, {state.id, nil, notif}) - end - - {:noreply, state} - end - - @doc """ - Implementation of the :interested handler, this is called when a peer we are - connected to asks to recieve data for this channel. - """ - def handle_cast({:interested, peer_id}, state) do - Shard.Manager.send(peer_id, {state.id, nil, {:root, state.mst.root}}) - {:noreply, state} - end - - def handle_cast({:subscribe, pid}, state) do - Process.monitor(pid) - new_subs = MapSet.put(state.subs, pid) - {:noreply, %{ state | subs: new_subs }} - end - - @doc """ - Implementation of the :msg handler, which is the main handler for messages - comming from other peers concerning this chat room. - - Messages are: - - `{:get, start}`: get some messages starting at a given Merkle hash - - `{:info, start, list, rest}`: put some messages and informs of the - Merkle hash of the store of older messages. - """ - def handle_cast({:msg, peer_id, _shard_id, nil, msg}, state) do - state = case msg do - {:get_manifest} -> - Shard.Manager.send(peer_id, {state.id, nil, {:manifest, state.manifest}}) - state - {:append, prev_root, msgitem, new_root} -> - # Append message: one single mesage has arrived - if new_root == state.mst.root do - # We already have the message, do nothing - state - else - # Try adding the message - if prev_root == state.mst.root do - mst2 = MST.insert(state.mst, msgitem) - if mst2.root == new_root do - # This was the only message missing, we are happy! - state = %{state | mst: mst2} - msg_callback(state, msgitem) - state - else - # More messages are missing, start a full merge - init_merge(state, new_root, peer_id) - end - else - init_merge(state, new_root, peer_id) - end - end - {:root, new_root} -> - if new_root == state.mst.root do - # already up to date, ignore - state - else - init_merge(state, new_root, peer_id) - end - x -> - Logger.info("Unhandled message: #{inspect x}") - state - end - {:noreply, state} - end - - defp init_merge(state, new_root, source_peer) do - # TODO: make the merge asynchronous - mgmst = %{state.mst | root: new_root} - mgmst = put_in(mgmst.store.prefer_ask, [source_peer]) - mst = MST.merge(state.mst, mgmst, fn msgitem, true -> msg_callback(state, msgitem) end) - %{state | mst: mst} - end - - def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do - new_subs = MapSet.delete(state.subs, pid) - {:noreply, %{ state | subs: new_subs }} - end - - defp msg_callback(state, {ts, nick, msg}) do - for pid <- state.subs do - if Process.alive?(pid) do - send(pid, {:chat_recv, state.channel, {ts, nick, msg}}) - end - end - end - - defp msg_cmp({ts1, nick1, msg1}, {ts2, nick2, msg2}) do - cond do - ts1 > ts2 -> :after - ts1 < ts2 -> :before - nick1 > nick2 -> :after - nick1 < nick2 -> :before - msg1 > msg2 -> :after - msg1 < msg2 -> :before - true -> :duplicate - end - end -end |