diff options
Diffstat (limited to 'shard/lib/app')
-rw-r--r-- | shard/lib/app/chat.ex | 47 | ||||
-rw-r--r-- | shard/lib/app/directory.ex | 21 | ||||
-rw-r--r-- | shard/lib/app/file.ex | 23 | ||||
-rw-r--r-- | shard/lib/app/identity.ex | 8 | ||||
-rw-r--r-- | shard/lib/app/pagestore.ex | 52 |
5 files changed, 104 insertions, 47 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index ff0c97d..b046fc3 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -12,11 +12,7 @@ defmodule SApp.Chat do %SApp.Chat.PrivChat.Manifest{pk_list: ordered_list_of_authorized_pks} Future improvements: - - message signing - - storage of the chatroom messages to disk - use a DHT to find peers that are interested in this channel - - epidemic broadcast (carefull not to be too costly, - maybe by limiting the number of peers we talk to) - partial synchronization only == data distributed over peers """ @@ -32,7 +28,9 @@ defmodule SApp.Chat do defmodule Manifest do @moduledoc""" - Manifest for a public chat room defined by its name. + Manifest for a public chat room defined by its name. Example: + + %SApp.Chat.Manifest{channel: "test"} """ defstruct [:channel] @@ -73,19 +71,22 @@ defmodule SApp.Chat do # ========== defmodule State do + @moduledoc""" + Internal state struct of chat shard. + """ + defstruct [:id, :netgroup, :manifest, :page_store, :mst, :subs, :read] end @doc """ - Start a process that connects to a given channel + Start a process that connects to a given channel. Don't call directly, use for instance: + + Shard.Manager.find_or_start %SApp.Chat.Manifest{channel: "my_chan"} """ def start_link(manifest) do GenServer.start_link(__MODULE__, manifest) end - @doc """ - Initialize channel process. - """ def init(manifest) do id = SData.term_hash manifest @@ -103,7 +104,7 @@ defmodule SApp.Chat do end root = cond do root == nil -> nil - GenServer.call(page_store, {:have_rec, root}) -> root + SApp.PageStore.have_rec?(page_store, root) -> root true -> Logger.warn "Not all pages for saved root were saved, restarting from an empty state!" nil @@ -124,9 +125,6 @@ defmodule SApp.Chat do } end - @doc """ - Implementation of the :manifest call that returns the chat room's manifest - """ def handle_call(:manifest, _from, state) do {:reply, state.manifest, state} end @@ -166,11 +164,6 @@ defmodule SApp.Chat do {:noreply, state} end - @doc """ - Implementation of the :chat_send handler. This is the main handler that is used - to send a message to the chat room. Puts the message in the store and syncs - with all connected peers. - """ def handle_cast({:chat_send, pk, msg}, state) do next_ts = case MST.last(state.mst, nil, 1) do [] -> System.os_time :seconds @@ -204,10 +197,6 @@ defmodule SApp.Chat do {:noreply, state} end - @doc """ - Implementation of the :interested handler, this is called when a peer we are - connected to asks to recieve data for this channel. - """ def handle_cast({:interested, conn_pid, auth}, state) do if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, true}}) @@ -221,10 +210,6 @@ defmodule SApp.Chat do {:noreply, %{ state | subs: new_subs }} end - @doc """ - Implementation of the :msg handler, which is the main handler for messages - comming from other peers concerning this chat room. - """ def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do # Ignore message @@ -245,7 +230,7 @@ defmodule SApp.Chat do mst2 = MST.insert(state.mst, msgitem) if mst2.root == new_root do state = %{state | mst: mst2} - GenServer.cast(state.page_store, {:set_roots, [mst2.root]}) + SApp.PageStore.set_roots(state.page_store, [mst2.root]) save_state(state) msg_callback(state, msgitem) SNet.Group.broadcast(state.netgroup, {state.id, nil, msg}, exclude_pid: [conn_pid]) @@ -311,7 +296,7 @@ defmodule SApp.Chat do for x <- new do msg_callback(state, x) end - GenServer.cast(state.page_store, {:set_roots, [mst.root]}) + SApp.PageStore.set_roots(state.page_store, [mst.root]) state = %{state | mst: mst} save_state(state) if state.mst.root != old_root do @@ -365,15 +350,15 @@ defmodule SApp.Chat do The process calling this function will start recieving messages of the form: - {:chat_recv, manifest, {pk, msgbin, sign}} + {:chat_recv, manifest, {pk, msgbin, sign}} or - {:chat_send, manifest, {pk, msgbin, sign}} + {:chat_send, manifest, {pk, msgbin, sign}} msgbin can be used in the following way: - {timestamp, message} = SData.term_unbin msgbin + {timestamp, message} = SData.term_unbin msgbin """ def subscribe(shard_pid) do GenServer.cast(shard_pid, {:subscribe, self()}) diff --git a/shard/lib/app/directory.ex b/shard/lib/app/directory.ex index cbea8c3..257e8b9 100644 --- a/shard/lib/app/directory.ex +++ b/shard/lib/app/directory.ex @@ -29,9 +29,18 @@ defmodule SApp.Directory do end defmodule State do + @moduledoc""" + Internal state struct of directory shard. + """ + defstruct [:owner, :public, :name, :manifest, :id, :netgroup, :items, :revitems] end + @doc""" + Start a process that connects to a given channel. Don't call directly, use for instance: + + Shard.Manager.find_or_start %SApp.Directory.Manifest{owner: my_pk, public: false, name: "collection"} + """ def start_link(manifest) do GenServer.start_link(__MODULE__, manifest) end @@ -217,23 +226,23 @@ defmodule SApp.Directory do @doc""" Return list of items stored in this directory. - Returns a dictionnary of %{name => {manifest, stored?}}. + Returns a dictionnary of `%{name => {manifest, stored?}}`. """ def get_items(pid) do GenServer.call(pid, :get_items) end @doc""" - Return the manifest of item with a given name in directory, or nil if not found. + Return the manifest of item with a given name in directory, or `nil` if not found. - Equivalent to get_items(pid)[name] but better. + Equivalent to `get_items(pid)[name]` but better. """ def read(pid, name) do GenServer.call(pid, {:read, name}) end @doc""" - Find an item in the directory by its manifest. Returns name if found or nil if not found. + Find an item in the directory by its manifest. Returns name if found or `nil` if not found. """ def find(pid, manifest) do GenServer.call(pid, {:find, manifest}) @@ -241,8 +250,8 @@ defmodule SApp.Directory do @doc""" Add an item to this directory. An item is a name for a shard manifest. - An item added to a directory becomes a dependency of the directory, i.e. - if the directory is pinned then all items inside are pinned as well. + An item added to a directory with `stored = true` becomes a dependency of the directory, + i.e. if the directory is pinned then all items inside are pinned as well. """ def add_item(pid, name, manifest, stored \\ true) do GenServer.call(pid, {:add_item, name, manifest, stored}) diff --git a/shard/lib/app/file.ex b/shard/lib/app/file.ex index e2a9798..0e07cc3 100644 --- a/shard/lib/app/file.ex +++ b/shard/lib/app/file.ex @@ -9,9 +9,12 @@ defmodule SApp.File do file_hash: hash size: int mime_type: string - } + } + + The file is cut in blocks that are collected in a k-ary Merkle tree + (see SData.MerkleTree for block size and k value). - The file is cut in blocks of 4kb that are collected in a 64-ary Merkle tree. + TODO I feel bad about some parts of the logic in here. """ use GenServer @@ -26,7 +29,7 @@ defmodule SApp.File do defmodule Manifest do @moduledoc""" Manifest for a file. - The file is identified by the root hash of its Merkle tree and by its mime type. + The file is identified by its infohash, which is the hash of a `SApp.File.Info` struct. """ defstruct [:infohash] @@ -46,9 +49,21 @@ defmodule SApp.File do end defmodule State do + @moduledoc""" + Internal state struct for file shard. + """ defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path, :reqs] end + @doc """ + Start a process that connects to a given channel. Don't call directly, use for instance: + + Shard.Manager.find_or_start %SApp.File.Manifest{infohash: "some_infohash"} + + or: + + SApp.File.Create("/path/to/file", "mime/type") + """ def start_link(manifest) do GenServer.start_link(__MODULE__, manifest) end @@ -229,7 +244,7 @@ defmodule SApp.File do true -> meta = get_mt(state) n_blocks = MT.block_count(meta) - expected_hashes = MT.get_range(meta, 0..(n_blocks-1)) + expected_hashes = MT.get_all(meta) actual_hashes = if File.exists?(state.path) do File.stream!(state.path, [], MT.block_size()) |> Enum.map(&(:crypto.hash(:sha256, &1))) diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex index 78abbe7..7422822 100644 --- a/shard/lib/app/identity.ex +++ b/shard/lib/app/identity.ex @@ -34,9 +34,17 @@ defmodule SApp.Identity do end defmodule State do + @moduledoc""" + Internal state struct for identity shard. + """ defstruct [:pk, :id, :state, :netgroup] end + @doc """ + Start a process that connects to a given channel. Don't call directly, use for instance: + + Shard.Manager.find_or_start %SApp.Identity.Manifest{pk: some_public_key} + """ def start_link(manifest) do GenServer.start_link(__MODULE__, manifest) end diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex index 3cda51d..0cbb10a 100644 --- a/shard/lib/app/pagestore.ex +++ b/shard/lib/app/pagestore.ex @@ -7,12 +7,29 @@ defmodule SApp.PageStore do Uses an ETS table of: - { page_id, why_have_it } -- waiting for data - { page_id, why_have_it, data } -- once we have the data + { page_id, why_have_it } # waiting for data + { page_id, why_have_it, data } # once we have the data - why_have_it := :root - | {:req_by, some_other_page_id} - | {:cached, expiry_date} + why_have_it := :root + | {:req_by, some_other_page_id} + | {:cached, expiry_date} + + TODO: at the moment we are trying to pull all missing pages at once from our peers. + This can work for metadata that isn't too big but won't work with bigger objects. + Have a smart strategy where we limit the number of requests currently in-flight but + still make sure everything gets pulled in. This will also pave the way to selectively + pulling in pages, for instance if we have a function to give them a priority score and + a maximum stored page count. + + A `SApp.PageStore` can be used as a `SData.PageStore` in the following way: + + %SApp.PageStore{pid: store_pid} + + or: + + %SApp.PageStore{pid: store_pid, prefer_ask: [connection_pid, ...]} + + In the second case, missing pages will be requested first to the specified peers. """ use GenServer @@ -25,6 +42,9 @@ defmodule SApp.PageStore do @max_failures 4 # Maximum of peers that reply not_found before we abandon defmodule State do + @moduledoc""" + Internal state struct of pagestore process. + """ defstruct [:shard_id, :path, :netgroup, :store, :reqs, :retries, :store_path] end @@ -258,7 +278,7 @@ defmodule SApp.PageStore do {:noreply, state} end - def ask_random_peers(state, key) do + defp ask_random_peers(state, key) do SNet.Group.broadcast(state.netgroup, {state.shard_id, state.path, {:get, key}}, nmax: 3) end @@ -289,4 +309,24 @@ defmodule SApp.PageStore do store ## DO SOMETHING??? end end + + # ==================== + # PAGE STORE INTERFACE + # ==================== + + @doc""" + Returns `true` if the page store currently stores the specified root page + and all its dependencies, recursively. + """ + def have_rec?(pid, root) do + GenServer.call(pid, {:have_rec, root}) + end + + @doc""" + Define the set of root pages we are interested in. This will start pulling in + the defined pages and all their dependencies recursively if we don't have them. + """ + def set_roots(pid, roots) do + GenServer.cast(pid, {:set_roots, roots}) + end end |