aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/app
diff options
context:
space:
mode:
Diffstat (limited to 'shard/lib/app')
-rw-r--r--shard/lib/app/chat.ex47
-rw-r--r--shard/lib/app/directory.ex21
-rw-r--r--shard/lib/app/file.ex23
-rw-r--r--shard/lib/app/identity.ex8
-rw-r--r--shard/lib/app/pagestore.ex52
5 files changed, 104 insertions, 47 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index ff0c97d..b046fc3 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -12,11 +12,7 @@ defmodule SApp.Chat do
%SApp.Chat.PrivChat.Manifest{pk_list: ordered_list_of_authorized_pks}
Future improvements:
- - message signing
- - storage of the chatroom messages to disk
- use a DHT to find peers that are interested in this channel
- - epidemic broadcast (carefull not to be too costly,
- maybe by limiting the number of peers we talk to)
- partial synchronization only == data distributed over peers
"""
@@ -32,7 +28,9 @@ defmodule SApp.Chat do
defmodule Manifest do
@moduledoc"""
- Manifest for a public chat room defined by its name.
+ Manifest for a public chat room defined by its name. Example:
+
+ %SApp.Chat.Manifest{channel: "test"}
"""
defstruct [:channel]
@@ -73,19 +71,22 @@ defmodule SApp.Chat do
# ==========
defmodule State do
+ @moduledoc"""
+ Internal state struct of chat shard.
+ """
+
defstruct [:id, :netgroup, :manifest, :page_store, :mst, :subs, :read]
end
@doc """
- Start a process that connects to a given channel
+ Start a process that connects to a given channel. Don't call directly, use for instance:
+
+ Shard.Manager.find_or_start %SApp.Chat.Manifest{channel: "my_chan"}
"""
def start_link(manifest) do
GenServer.start_link(__MODULE__, manifest)
end
- @doc """
- Initialize channel process.
- """
def init(manifest) do
id = SData.term_hash manifest
@@ -103,7 +104,7 @@ defmodule SApp.Chat do
end
root = cond do
root == nil -> nil
- GenServer.call(page_store, {:have_rec, root}) -> root
+ SApp.PageStore.have_rec?(page_store, root) -> root
true ->
Logger.warn "Not all pages for saved root were saved, restarting from an empty state!"
nil
@@ -124,9 +125,6 @@ defmodule SApp.Chat do
}
end
- @doc """
- Implementation of the :manifest call that returns the chat room's manifest
- """
def handle_call(:manifest, _from, state) do
{:reply, state.manifest, state}
end
@@ -166,11 +164,6 @@ defmodule SApp.Chat do
{:noreply, state}
end
- @doc """
- Implementation of the :chat_send handler. This is the main handler that is used
- to send a message to the chat room. Puts the message in the store and syncs
- with all connected peers.
- """
def handle_cast({:chat_send, pk, msg}, state) do
next_ts = case MST.last(state.mst, nil, 1) do
[] -> System.os_time :seconds
@@ -204,10 +197,6 @@ defmodule SApp.Chat do
{:noreply, state}
end
- @doc """
- Implementation of the :interested handler, this is called when a peer we are
- connected to asks to recieve data for this channel.
- """
def handle_cast({:interested, conn_pid, auth}, state) do
if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:root, state.mst.root, true}})
@@ -221,10 +210,6 @@ defmodule SApp.Chat do
{:noreply, %{ state | subs: new_subs }}
end
- @doc """
- Implementation of the :msg handler, which is the main handler for messages
- comming from other peers concerning this chat room.
- """
def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
# Ignore message
@@ -245,7 +230,7 @@ defmodule SApp.Chat do
mst2 = MST.insert(state.mst, msgitem)
if mst2.root == new_root do
state = %{state | mst: mst2}
- GenServer.cast(state.page_store, {:set_roots, [mst2.root]})
+ SApp.PageStore.set_roots(state.page_store, [mst2.root])
save_state(state)
msg_callback(state, msgitem)
SNet.Group.broadcast(state.netgroup, {state.id, nil, msg}, exclude_pid: [conn_pid])
@@ -311,7 +296,7 @@ defmodule SApp.Chat do
for x <- new do
msg_callback(state, x)
end
- GenServer.cast(state.page_store, {:set_roots, [mst.root]})
+ SApp.PageStore.set_roots(state.page_store, [mst.root])
state = %{state | mst: mst}
save_state(state)
if state.mst.root != old_root do
@@ -365,15 +350,15 @@ defmodule SApp.Chat do
The process calling this function will start recieving messages of the form:
- {:chat_recv, manifest, {pk, msgbin, sign}}
+ {:chat_recv, manifest, {pk, msgbin, sign}}
or
- {:chat_send, manifest, {pk, msgbin, sign}}
+ {:chat_send, manifest, {pk, msgbin, sign}}
msgbin can be used in the following way:
- {timestamp, message} = SData.term_unbin msgbin
+ {timestamp, message} = SData.term_unbin msgbin
"""
def subscribe(shard_pid) do
GenServer.cast(shard_pid, {:subscribe, self()})
diff --git a/shard/lib/app/directory.ex b/shard/lib/app/directory.ex
index cbea8c3..257e8b9 100644
--- a/shard/lib/app/directory.ex
+++ b/shard/lib/app/directory.ex
@@ -29,9 +29,18 @@ defmodule SApp.Directory do
end
defmodule State do
+ @moduledoc"""
+ Internal state struct of directory shard.
+ """
+
defstruct [:owner, :public, :name, :manifest, :id, :netgroup, :items, :revitems]
end
+ @doc"""
+ Start a process that connects to a given channel. Don't call directly, use for instance:
+
+ Shard.Manager.find_or_start %SApp.Directory.Manifest{owner: my_pk, public: false, name: "collection"}
+ """
def start_link(manifest) do
GenServer.start_link(__MODULE__, manifest)
end
@@ -217,23 +226,23 @@ defmodule SApp.Directory do
@doc"""
Return list of items stored in this directory.
- Returns a dictionnary of %{name => {manifest, stored?}}.
+ Returns a dictionnary of `%{name => {manifest, stored?}}`.
"""
def get_items(pid) do
GenServer.call(pid, :get_items)
end
@doc"""
- Return the manifest of item with a given name in directory, or nil if not found.
+ Return the manifest of item with a given name in directory, or `nil` if not found.
- Equivalent to get_items(pid)[name] but better.
+ Equivalent to `get_items(pid)[name]` but better.
"""
def read(pid, name) do
GenServer.call(pid, {:read, name})
end
@doc"""
- Find an item in the directory by its manifest. Returns name if found or nil if not found.
+ Find an item in the directory by its manifest. Returns name if found or `nil` if not found.
"""
def find(pid, manifest) do
GenServer.call(pid, {:find, manifest})
@@ -241,8 +250,8 @@ defmodule SApp.Directory do
@doc"""
Add an item to this directory. An item is a name for a shard manifest.
- An item added to a directory becomes a dependency of the directory, i.e.
- if the directory is pinned then all items inside are pinned as well.
+ An item added to a directory with `stored = true` becomes a dependency of the directory,
+ i.e. if the directory is pinned then all items inside are pinned as well.
"""
def add_item(pid, name, manifest, stored \\ true) do
GenServer.call(pid, {:add_item, name, manifest, stored})
diff --git a/shard/lib/app/file.ex b/shard/lib/app/file.ex
index e2a9798..0e07cc3 100644
--- a/shard/lib/app/file.ex
+++ b/shard/lib/app/file.ex
@@ -9,9 +9,12 @@ defmodule SApp.File do
file_hash: hash
size: int
mime_type: string
- }
+ }
+
+ The file is cut in blocks that are collected in a k-ary Merkle tree
+ (see SData.MerkleTree for block size and k value).
- The file is cut in blocks of 4kb that are collected in a 64-ary Merkle tree.
+ TODO I feel bad about some parts of the logic in here.
"""
use GenServer
@@ -26,7 +29,7 @@ defmodule SApp.File do
defmodule Manifest do
@moduledoc"""
Manifest for a file.
- The file is identified by the root hash of its Merkle tree and by its mime type.
+ The file is identified by its infohash, which is the hash of a `SApp.File.Info` struct.
"""
defstruct [:infohash]
@@ -46,9 +49,21 @@ defmodule SApp.File do
end
defmodule State do
+ @moduledoc"""
+ Internal state struct for file shard.
+ """
defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path, :reqs]
end
+ @doc """
+ Start a process that connects to a given channel. Don't call directly, use for instance:
+
+ Shard.Manager.find_or_start %SApp.File.Manifest{infohash: "some_infohash"}
+
+ or:
+
+ SApp.File.Create("/path/to/file", "mime/type")
+ """
def start_link(manifest) do
GenServer.start_link(__MODULE__, manifest)
end
@@ -229,7 +244,7 @@ defmodule SApp.File do
true ->
meta = get_mt(state)
n_blocks = MT.block_count(meta)
- expected_hashes = MT.get_range(meta, 0..(n_blocks-1))
+ expected_hashes = MT.get_all(meta)
actual_hashes = if File.exists?(state.path) do
File.stream!(state.path, [], MT.block_size())
|> Enum.map(&(:crypto.hash(:sha256, &1)))
diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex
index 78abbe7..7422822 100644
--- a/shard/lib/app/identity.ex
+++ b/shard/lib/app/identity.ex
@@ -34,9 +34,17 @@ defmodule SApp.Identity do
end
defmodule State do
+ @moduledoc"""
+ Internal state struct for identity shard.
+ """
defstruct [:pk, :id, :state, :netgroup]
end
+ @doc """
+ Start a process that connects to a given channel. Don't call directly, use for instance:
+
+ Shard.Manager.find_or_start %SApp.Identity.Manifest{pk: some_public_key}
+ """
def start_link(manifest) do
GenServer.start_link(__MODULE__, manifest)
end
diff --git a/shard/lib/app/pagestore.ex b/shard/lib/app/pagestore.ex
index 3cda51d..0cbb10a 100644
--- a/shard/lib/app/pagestore.ex
+++ b/shard/lib/app/pagestore.ex
@@ -7,12 +7,29 @@ defmodule SApp.PageStore do
Uses an ETS table of:
- { page_id, why_have_it } -- waiting for data
- { page_id, why_have_it, data } -- once we have the data
+ { page_id, why_have_it } # waiting for data
+ { page_id, why_have_it, data } # once we have the data
- why_have_it := :root
- | {:req_by, some_other_page_id}
- | {:cached, expiry_date}
+ why_have_it := :root
+ | {:req_by, some_other_page_id}
+ | {:cached, expiry_date}
+
+ TODO: at the moment we are trying to pull all missing pages at once from our peers.
+ This can work for metadata that isn't too big but won't work with bigger objects.
+ Have a smart strategy where we limit the number of requests currently in-flight but
+ still make sure everything gets pulled in. This will also pave the way to selectively
+ pulling in pages, for instance if we have a function to give them a priority score and
+ a maximum stored page count.
+
+ A `SApp.PageStore` can be used as a `SData.PageStore` in the following way:
+
+ %SApp.PageStore{pid: store_pid}
+
+ or:
+
+ %SApp.PageStore{pid: store_pid, prefer_ask: [connection_pid, ...]}
+
+ In the second case, missing pages will be requested first to the specified peers.
"""
use GenServer
@@ -25,6 +42,9 @@ defmodule SApp.PageStore do
@max_failures 4 # Maximum of peers that reply not_found before we abandon
defmodule State do
+ @moduledoc"""
+ Internal state struct of pagestore process.
+ """
defstruct [:shard_id, :path, :netgroup, :store, :reqs, :retries, :store_path]
end
@@ -258,7 +278,7 @@ defmodule SApp.PageStore do
{:noreply, state}
end
- def ask_random_peers(state, key) do
+ defp ask_random_peers(state, key) do
SNet.Group.broadcast(state.netgroup, {state.shard_id, state.path, {:get, key}}, nmax: 3)
end
@@ -289,4 +309,24 @@ defmodule SApp.PageStore do
store ## DO SOMETHING???
end
end
+
+ # ====================
+ # PAGE STORE INTERFACE
+ # ====================
+
+ @doc"""
+ Returns `true` if the page store currently stores the specified root page
+ and all its dependencies, recursively.
+ """
+ def have_rec?(pid, root) do
+ GenServer.call(pid, {:have_rec, root})
+ end
+
+ @doc"""
+ Define the set of root pages we are interested in. This will start pulling in
+ the defined pages and all their dependencies recursively if we don't have them.
+ """
+ def set_roots(pid, roots) do
+ GenServer.cast(pid, {:set_roots, roots})
+ end
end