aboutsummaryrefslogtreecommitdiff
path: root/shard
diff options
context:
space:
mode:
Diffstat (limited to 'shard')
-rw-r--r--shard/lib/app/chat.ex3
-rw-r--r--shard/lib/app/directory.ex7
-rw-r--r--shard/lib/app/file.ex261
-rw-r--r--shard/lib/data/merkletree.ex11
-rw-r--r--shard/lib/shard_uri.ex46
5 files changed, 310 insertions, 18 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex
index 405210b..2795153 100644
--- a/shard/lib/app/chat.ex
+++ b/shard/lib/app/chat.ex
@@ -226,9 +226,6 @@ defmodule SApp.Chat do
{:noreply, state}
else
state = case msg do
- {:get_manifest} ->
- SNet.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}})
- state
{:append, prev_root, msgitem, new_root} ->
# Append message: one single mesage has arrived
if new_root == state.mst.root do
diff --git a/shard/lib/app/directory.ex b/shard/lib/app/directory.ex
index 96e3a2b..e9482f7 100644
--- a/shard/lib/app/directory.ex
+++ b/shard/lib/app/directory.ex
@@ -145,6 +145,11 @@ defmodule SApp.Directory do
{:noreply, state}
end
+ def handle_cast({:peer_connected, peer_pid}, state) do
+ SNet.Manager.send_pid(peer_pid, {:interested, [state.id]})
+ {:noreply, state}
+ end
+
def handle_cast({:interested, peer_pid, auth}, state) do
if SNet.Group.in_group?(state.netgroup, peer_pid, auth) do
SNet.Manager.send_pid(peer_pid, {state.id, nil, {:update, SData.SignRev.signed(state.items), true}})
@@ -197,9 +202,7 @@ defmodule SApp.Directory do
defp send_deps(state) do
dict = SData.SignRev.get(state.items)
- IO.puts("items: #{inspect dict}")
deps = for {_, {m, stored}} <- dict, stored, do: m
- IO.puts("stored: #{inspect deps}")
GenServer.cast(Shard.Manager, {:dep_list, state.id, deps})
end
diff --git a/shard/lib/app/file.ex b/shard/lib/app/file.ex
index a874222..b28f742 100644
--- a/shard/lib/app/file.ex
+++ b/shard/lib/app/file.ex
@@ -20,6 +20,9 @@ defmodule SApp.File do
alias SData.MerkleTree, as: MT
+ @concurrent_reqs 10
+ @req_timeout_msec 2000
+
defmodule Manifest do
@moduledoc"""
Manifest for a file.
@@ -43,7 +46,7 @@ defmodule SApp.File do
end
defmodule State do
- defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path]
+ defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path, :reqs]
end
def start_link(manifest) do
@@ -53,39 +56,274 @@ defmodule SApp.File do
def init(manifest) do
%Manifest{infohash: infohash} = manifest
id = SData.term_hash manifest
-
Shard.Manager.dispatch_to(id, nil, self())
- {infobin, info} = case Shard.Manager.load_state(id) do
- nil -> {nil, nil}
- infobin -> {infobin, SData.term_unbin infobin}
- end
+
netgroup = %SNet.PubShardGroup{id: id}
SNet.Group.init_lookup(netgroup, self())
path = [Application.get_env(:shard, :data_path), "#{id|>Base.encode16}"] |> Path.join
-
{:ok, store} = SApp.PageStore.start_link(id, :meta, netgroup)
+ {infobin, info} = case Shard.Manager.load_state(id) do
+ nil ->
+ {nil, nil}
+ infobin ->
+ info = SData.term_unbin(infobin)
+ Process.send_after(self(), {:calc_missing, 1}, 100)
+ GenServer.cast(store, {:set_roots, [info.merkle_root]})
+ {infobin, info}
+ end
+
{:ok, %State{
id: id, infohash: infohash, manifest: manifest, netgroup: netgroup,
- infobin: infobin, info: info, store: store, missing: nil, path: path
+ infobin: infobin, info: info, store: store, missing: nil, path: path, reqs: %{}
}}
end
+ def handle_call(:manifest, _from, state) do
+ {:reply, state.manifest, state}
+ end
+
+ def handle_call(:get_info, _from, state) do
+ reply = cond do
+ state.info != nil and GenServer.call(state.store, {:have_rec, state.info.merkle_root}) ->
+ mt = get_mt(state)
+ nblk = MT.block_count(mt)
+ [infohash: state.infohash,
+ file_hash: state.info.file_hash,
+ size: state.info.size,
+ mime_type: state.info.mime_type,
+ num_blocks: nblk,
+ missing_blocks: if state.missing != nil do map_size(state.missing) else nil end]
+ state.info != nil ->
+ [infohash: state.infohash,
+ file_hash: state.info.file_hash,
+ size: state.info.size,
+ mime_type: state.info.mime_type]
+ true ->
+ [infohash: state.infohash]
+ end
+ {:reply, reply, state}
+ end
+
+ def handle_cast(:send_deps, state) do
+ GenServer.cast(Shard.Manager, {:dep_list, state.id, []})
+ {:noreply, state}
+ end
+
def handle_cast({:init_with, file_path, infobin, mt}, state) do
+ true = (SData.bin_hash(infobin) == state.infohash)
+ Shard.Manager.save_state(state.id, infobin)
info = SData.term_unbin(infobin)
- for {k, v} <- mt.store do
- {^k, _} = SData.PageStore.put(state.store, v)
+ for {k, v} <- mt.store.pages do
+ {^k, _} = SData.PageStore.put(%SApp.PageStore{pid: state.store, prefer_ask: []}, v)
end
File.copy!(file_path, state.path)
new_state = %{state |
infobin: infobin,
info: info,
+ missing: %{},
+ reqs: %{},
}
{:noreply, new_state}
end
- # TODO networking etc
+ def handle_cast({:peer_connected, conn_pid}, state) do
+ SNet.Manager.send_pid(conn_pid, {:interested, [state.id]})
+ {:noreply, state}
+ end
+
+ def handle_cast({:interested, conn_pid, auth}, state) do
+ if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
+ cond do
+ state.info == nil ->
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:get_info}})
+ state.missing != nil ->
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}})
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), true}})
+ true ->
+ nil
+ end
+ end
+ {:noreply, state}
+ end
+
+ def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
+ if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
+ # Ignore message
+ {:noreply, state}
+ else
+ Logger.info("<- #{inspect msg}")
+ state = case msg do
+ {:get_info} ->
+ if state.infobin != nil do
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}})
+ end
+ state
+ {:info, infobin} ->
+ if state.infobin == nil and SData.bin_hash(infobin) == state.infohash do
+ Shard.Manager.save_state(state.id, infobin)
+ state = %{state | infobin: infobin, info: SData.term_unbin(infobin)}
+ GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]})
+ Process.send_after(self(), {:calc_missing, 1}, 100)
+ state
+ else
+ state
+ end
+ {:missing, missing_list, rep} ->
+ if state.missing != nil do
+ have_list = missing_list |> Enum.filter(&(state.missing[&1] == nil))
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:have, have_list}})
+ if rep and map_size(state.missing) > 0 do
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), false}})
+ end
+ end
+ state
+ {:have, have_list} ->
+ peer_info = GenServer.call(conn_pid, :get_peer_info)
+ missing = Enum.reduce(have_list, state.missing, fn id, missing ->
+ if missing[id] != nil and peer_info not in missing[id] do
+ new_have = [peer_info | missing[id]]
+ put_in(missing[id], Enum.take(new_have, 10))
+ else
+ missing
+ end
+ end)
+ state = %{state | missing: missing}
+ send_reqs(state)
+ {:get, id} ->
+ if state.missing[id] == nil do
+ piece = get_piece(state, id)
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:piece, id, piece}})
+ else
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:dont_have, id}})
+ end
+ state
+ {:piece, id, blk} ->
+ mt = get_mt(state)
+ if state.missing[id] != nil and MT.get(mt, id) == :crypto.hash(:sha256, blk) do
+ state = put_piece(state, id, blk)
+ if map_size(state.missing) == 0 do
+ send(self(), {:calc_missing, 1})
+ end
+ state = put_in(state.reqs, Map.delete(state.reqs, id))
+ send_reqs(state)
+ else
+ state
+ end
+ end
+ {:noreply, state}
+ end
+ end
+
+ def handle_info({:calc_missing, iter}, state) do
+ if state.info != nil do
+ Logger.info(":calc_missing for #{state.id|>Base.encode16}")
+ missing = case GenServer.call(state.store, {:have_rec, state.info.merkle_root}) do
+ true ->
+ meta = get_mt(state)
+ n_blocks = MT.block_count(meta)
+ expected_hashes = MT.get_range(meta, 0..(n_blocks-1))
+ actual_hashes = if File.exists?(state.path) do
+ File.stream!(state.path, [], MT.block_size())
+ |> Enum.map(&(:crypto.hash(:sha256, &1)))
+ else
+ []
+ end
+ missing_1 = Enum.zip([expected_hashes, actual_hashes, 0..n_blocks])
+ |> Enum.filter(fn {a, b, _} -> a != b end)
+ |> Enum.map(&(elem(&1, 2)))
+ missing_2 = if Enum.count(actual_hashes) < n_blocks do Enum.to_list((Enum.count actual_hashes)..(n_blocks-1)) else [] end
+ missing = for k <- missing_1 ++ missing_2, into: %{} do
+ if state.missing != nil and state.missing[k] != nil do
+ {k, state.missing[k]}
+ else
+ {k, []}
+ end
+ end
+ Logger.info("Missing pieces: #{map_size missing} / #{n_blocks}")
+ missing
+ false ->
+ Logger.info("Incomplete Merkle tree meta data, requesting info from peers.")
+ GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]})
+ Process.send_after(self(), {:calc_missing, iter + 1}, iter * 1000)
+ nil
+ end
+ state = %{state | missing: missing}
+ state = if missing != nil do
+ SNet.Group.broadcast(state.netgroup, {state.id, nil, {:missing, Map.keys(missing), false}})
+ send_reqs(state)
+ else
+ state
+ end
+ {:noreply, state}
+ else
+ Logger.info(":calc_missing for #{state.infohash|>Base.encode16} -> no info")
+ {:noreply, state}
+ end
+ end
+
+ def handle_info({:req_timeout, id}, state) do
+ if state.reqs[id] != nil do
+ state = put_in(state.reqs, Map.delete(state.reqs, id))
+ state = send_reqs(state)
+ {:noreply, state}
+ else
+ {:noreply, state}
+ end
+ end
+
+ defp send_reqs(state) do
+ can_req = for {k, v} <- state.missing, v != [], state.reqs[k] == nil, do: k
+ n_curr_req = map_size(state.reqs)
+
+ if can_req != [] and n_curr_req < @concurrent_reqs do
+ pieces = can_req |> Enum.sort() |> Enum.take(@concurrent_reqs - n_curr_req)
+ Enum.reduce(pieces, state, fn id, state ->
+ who = a_random_peer(state.missing[id])
+ Logger.info("Req #{id} to #{inspect who}")
+ SNet.Manager.send(who, {state.id, nil, {:get, id}})
+ Process.send_after(self(), {:req_timeout, id}, @req_timeout_msec)
+ put_in(state.reqs[id], who)
+ end)
+ else
+ state
+ end
+ end
+
+ defp get_piece(state, id) do
+ nil = state.missing[id]
+ fh = File.open!(state.path, [:read, :binary])
+ {:ok, blk} = :file.pread(fh, id * MT.block_size(), MT.block_size())
+ :file.close(fh)
+ blk
+ end
+
+ defp put_piece(state, id, blk) do
+ mt = get_mt(state)
+ true = (MT.get(mt, id) == :crypto.hash(:sha256, blk))
+ fh = File.open!(state.path, [:read, :write, :binary])
+ :ok = :file.pwrite(fh, id * MT.block_size(), blk)
+ :file.close(fh)
+ %{state | missing: Map.delete(state.missing, id),
+ reqs: Map.delete(state.reqs, id)}
+ end
+
+ defp a_random_peer(list) do
+ case list do
+ [a] -> a
+ [a|r] ->
+ if :rand.uniform() < 0.3 do
+ a
+ else
+ a_random_peer(r)
+ end
+ end
+ end
+
+ defp get_mt(state) do
+ %MT{root: state.info.merkle_root, store: %SApp.PageStore{pid: state.store, prefer_ask: []}}
+ end
# =========
# INTERFACE
@@ -110,5 +348,6 @@ defmodule SApp.File do
manifest = %Manifest{infohash: infohash}
pid = Shard.Manager.find_or_start(manifest)
GenServer.cast(pid, {:init_with, path, infobin, mt})
+ {:ok, manifest, pid}
end
end
diff --git a/shard/lib/data/merkletree.ex b/shard/lib/data/merkletree.ex
index 90361a3..73679cf 100644
--- a/shard/lib/data/merkletree.ex
+++ b/shard/lib/data/merkletree.ex
@@ -5,8 +5,8 @@ defmodule SData.MerkleTree do
alias SData.PageStore, as: Store
- @block_size 4096
- @tree_arity 64
+ @block_size 8192
+ @tree_arity 256
defstruct [:root, :store]
@@ -25,6 +25,13 @@ defmodule SData.MerkleTree do
end
@doc"""
+ Get the block size used by merkle trees
+ """
+ def block_size() do
+ @block_size
+ end
+
+ @doc"""
Create a Merkle tree for indexing a file.
"""
def create(file, store \\ SData.LocalStore.new()) do
diff --git a/shard/lib/shard_uri.ex b/shard/lib/shard_uri.ex
new file mode 100644
index 0000000..1b186d2
--- /dev/null
+++ b/shard/lib/shard_uri.ex
@@ -0,0 +1,46 @@
+defmodule ShardURI do
+ @moduledoc"""
+ Convert Shard manifests to and from text strings.
+ """
+
+ def from_manifest(m) do
+ case m do
+ %SApp.Chat.Manifest{channel: chan} -> "shard:chat:#{chan}"
+ %SApp.Chat.PrivChat.Manifest{pk_list: pk_list} ->
+ "shard:privchat:#{pk_list|>Enum.map(&Base.encode16/1)|>Enum.join(",")}"
+ %SApp.Identity.Manifest{pk: pk} ->
+ "shard:identity:#{pk|>Base.encode16}"
+ %SApp.Directory.Manifest{owner: owner, public: true, name: name} ->
+ "shard:dir:pub:#{owner|>Base.encode16}:#{name}"
+ %SApp.Directory.Manifest{owner: owner, public: false, name: name} ->
+ "shard:dir:priv:#{owner|>Base.encode16}:#{name}"
+ %SApp.File.Manifest{infohash: infohash} ->
+ "shard:file:#{infohash|>Base.encode16}"
+ end
+ end
+
+ def to_manifest(p) do
+ case p do
+ "shard:chat:" <> chan ->
+ %SApp.Chat.Manifest{channel: chan}
+ "shard:privchat:" <> pklist ->
+ pklist
+ |> String.split(",")
+ |> Enum.map(&parse_pk/1)
+ |> SApp.Chat.PrivChat.Manifest.new()
+ "shard:identity:" <> pk ->
+ %SApp.Identity.Manifest{pk: parse_pk pk}
+ "shard:dir:pub:" <> <<pk::bytes-size(64)>> <> ":" <> name ->
+ %SApp.Directory.Manifest{owner: parse_pk(pk), public: true, name: name}
+ "shard:dir:priv:" <> <<pk::bytes-size(64)>> <> ":" <> name ->
+ %SApp.Directory.Manifest{owner: parse_pk(pk), public: false, name: name}
+ "shard:file:" <> <<infohash::bytes-size(64)>> ->
+ %SApp.File.Manifest{infohash: Base.decode16!(infohash)}
+ end
+ end
+
+ def parse_pk(pkstr) do
+ 64 = byte_size pkstr
+ Base.decode16! pkstr
+ end
+end