defmodule SApp.File do
@moduledoc"""
Shard application for a file identified by its infohash. The file cannot be modified.
The infohash is the hash of an info struct containing:
%Info{
merkle_root: hash
file_hash: hash
size: int
mime_type: string
}
The file is cut in blocks of 4kb that are collected in a 64-ary Merkle tree.
"""
use GenServer
require Logger
alias SData.MerkleTree, as: MT
@concurrent_reqs 10
@req_timeout_msec 2000
defmodule Manifest do
@moduledoc"""
Manifest for a file.
The file is identified by the root hash of its Merkle tree and by its mime type.
"""
defstruct [:infohash]
defimpl Shard.Manifest do
def module(_m), do: SApp.File
def is_valid?(m) do
byte_size(m.infohash) == 32
end
end
end
defmodule Info do
@moduledoc"""
A file info struct.
"""
defstruct [:merkle_root, :file_hash, :size, :mime_type]
end
defmodule State do
defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path, :reqs]
end
def start_link(manifest) do
GenServer.start_link(__MODULE__, manifest)
end
def init(manifest) do
%Manifest{infohash: infohash} = manifest
id = SData.term_hash manifest
Shard.Manager.dispatch_to(id, nil, self())
netgroup = %SNet.PubShardGroup{id: id}
SNet.Group.init_lookup(netgroup, self())
path = [Application.get_env(:shard, :data_path), "#{id|>Base.encode16}"] |> Path.join
{:ok, store} = SApp.PageStore.start_link(id, :meta, netgroup)
{infobin, info} = case Shard.Manager.load_state(id) do
nil ->
{nil, nil}
infobin ->
info = SData.term_unbin(infobin)
Process.send_after(self(), {:calc_missing, 1}, 100)
GenServer.cast(store, {:set_roots, [info.merkle_root]})
{infobin, info}
end
{:ok, %State{
id: id, infohash: infohash, manifest: manifest, netgroup: netgroup,
infobin: infobin, info: info, store: store, missing: nil, path: path, reqs: %{}
}}
end
def handle_call(:manifest, _from, state) do
{:reply, state.manifest, state}
end
def handle_call(:delete_shard, _from, state) do
GenServer.call(state.store, :delete_store)
File.rm(state.path)
{:stop, :normal, :ok, state}
end
def handle_call(:get_info, _from, state) do
reply = cond do
state.info != nil and GenServer.call(state.store, {:have_rec, state.info.merkle_root}) ->
mt = get_mt(state)
nblk = MT.block_count(mt)
[infohash: state.infohash,
file_hash: state.info.file_hash,
size: state.info.size,
mime_type: state.info.mime_type,
num_blocks: nblk,
missing_blocks: if state.missing != nil do map_size(state.missing) else nil end,
path: state.path]
state.info != nil ->
[infohash: state.infohash,
file_hash: state.info.file_hash,
size: state.info.size,
mime_type: state.info.mime_type]
true ->
[infohash: state.infohash]
end
{:reply, reply, state}
end
def handle_cast(:send_deps, state) do
GenServer.cast(Shard.Manager, {:dep_list, state.id, []})
{:noreply, state}
end
def handle_cast({:init_with, file_path, infobin, mt}, state) do
true = (SData.bin_hash(infobin) == state.infohash)
Shard.Manager.save_state(state.id, infobin)
info = SData.term_unbin(infobin)
for {k, v} <- mt.store.pages do
{^k, _} = SData.PageStore.put(%SApp.PageStore{pid: state.store, prefer_ask: []}, v)
end
File.copy!(file_path, state.path)
new_state = %{state |
infobin: infobin,
info: info,
missing: %{},
reqs: %{},
}
{:noreply, new_state}
end
def handle_cast({:peer_connected, conn_pid}, state) do
SNet.Manager.send_pid(conn_pid, {:interested, [state.id]})
{:noreply, state}
end
def handle_cast({:interested, conn_pid, auth}, state) do
if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
cond do
state.info == nil ->
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:get_info}})
state.missing != nil ->
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}})
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), true}})
true ->
nil
end
end
{:noreply, state}
end
def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
# Ignore message
{:noreply, state}
else
state = case msg do
{:get_info} ->
if state.infobin != nil do
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}})
end
state
{:info, infobin} ->
if state.infobin == nil and SData.bin_hash(infobin) == state.infohash do
Shard.Manager.save_state(state.id, infobin)
state = %{state | infobin: infobin, info: SData.term_unbin(infobin)}
GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]})
Process.send_after(self(), {:calc_missing, 1}, 100)
state
else
state
end
{:missing, missing_list, rep} ->
if state.missing != nil do
have_list = missing_list |> Enum.filter(&(state.missing[&1] == nil))
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:have, have_list}})
if rep and map_size(state.missing) > 0 do
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), false}})
end
end
state
{:have, have_list} ->
peer_info = GenServer.call(conn_pid, :get_peer_info)
missing = Enum.reduce(have_list, state.missing, fn id, missing ->
if missing[id] != nil and peer_info not in missing[id] do
new_have = [peer_info | missing[id]]
put_in(missing[id], Enum.take(new_have, 10))
else
missing
end
end)
state = %{state | missing: missing}
send_reqs(state)
{:get, id} ->
if state.missing[id] == nil do
piece = get_piece(state, id)
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:piece, id, piece}})
else
SNet.Manager.send_pid(conn_pid, {state.id, nil, {:dont_have, id}})
end
state
{:piece, id, blk} ->
mt = get_mt(state)
if state.missing[id] != nil and MT.get(mt, id) == :crypto.hash(:sha256, blk) do
state = put_piece(state, id, blk)
if map_size(state.missing) == 0 do
send(self(), {:calc_missing, 1})
end
state = put_in(state.reqs, Map.delete(state.reqs, id))
send_reqs(state)
else
state
end
end
{:noreply, state}
end
end
def handle_info({:calc_missing, iter}, state) do
if state.info != nil do
Logger.info(":calc_missing for #{state.id|>Base.encode16}")
missing = case GenServer.call(state.store, {:have_rec, state.info.merkle_root}) do
true ->
meta = get_mt(state)
n_blocks = MT.block_count(meta)
expected_hashes = MT.get_range(meta, 0..(n_blocks-1))
actual_hashes = if File.exists?(state.path) do
File.stream!(state.path, [], MT.block_size())
|> Enum.map(&(:crypto.hash(:sha256, &1)))
else
[]
end
missing_1 = Enum.zip([expected_hashes, actual_hashes, 0..n_blocks])
|> Enum.filter(fn {a, b, _} -> a != b end)
|> Enum.map(&(elem(&1, 2)))
missing_2 = if Enum.count(actual_hashes) < n_blocks do Enum.to_list((Enum.count actual_hashes)..(n_blocks-1)) else [] end
missing = for k <- missing_1 ++ missing_2, into: %{} do
if state.missing != nil and state.missing[k] != nil do
{k, state.missing[k]}
else
{k, []}
end
end
Logger.info("Missing pieces: #{map_size missing} / #{n_blocks}")
missing
false ->
Logger.info("Incomplete Merkle tree meta data, requesting info from peers.")
GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]})
Process.send_after(self(), {:calc_missing, iter + 1}, iter * 1000)
nil
end
state = %{state | missing: missing}
state = if missing != nil do
SNet.Group.broadcast(state.netgroup, {state.id, nil, {:missing, Map.keys(missing), false}})
send_reqs(state)
else
state
end
{:noreply, state}
else
Logger.info(":calc_missing for #{state.id|>Base.encode16} -> no info")
{:noreply, state}
end
end
def handle_info({:req_timeout, id}, state) do
if state.reqs[id] != nil do
state = put_in(state.reqs, Map.delete(state.reqs, id))
state = send_reqs(state)
{:noreply, state}
else
{:noreply, state}
end
end
defp send_reqs(state) do
can_req = for {k, v} <- state.missing, v != [], state.reqs[k] == nil, do: k
n_curr_req = map_size(state.reqs)
if can_req != [] and n_curr_req < @concurrent_reqs do
pieces = can_req |> Enum.sort() |> Enum.take(@concurrent_reqs - n_curr_req)
Enum.reduce(pieces, state, fn id, state ->
who = a_random_peer(state.missing[id])
Logger.info("#{state.id|>Base.encode16} | Req #{id} to #{inspect who}")
SNet.Manager.send(who, {state.id, nil, {:get, id}})
Process.send_after(self(), {:req_timeout, id}, @req_timeout_msec)
put_in(state.reqs[id], who)
end)
else
state
end
end
defp get_piece(state, id) do
nil = state.missing[id]
fh = File.open!(state.path, [:read, :binary])
{:ok, blk} = :file.pread(fh, id * MT.block_size(), MT.block_size())
:file.close(fh)
blk
end
defp put_piece(state, id, blk) do
mt = get_mt(state)
true = (MT.get(mt, id) == :crypto.hash(:sha256, blk))
fh = File.open!(state.path, [:read, :write, :binary])
:ok = :file.pwrite(fh, id * MT.block_size(), blk)
:file.close(fh)
%{state | missing: Map.delete(state.missing, id),
reqs: Map.delete(state.reqs, id)}
end
defp a_random_peer(list) do
case list do
[a] -> a
[a|r] ->
if :rand.uniform() < 0.3 do
a
else
a_random_peer(r)
end
end
end
defp get_mt(state) do
%MT{root: state.info.merkle_root, store: %SApp.PageStore{pid: state.store, prefer_ask: []}}
end
# =========
# INTERFACE
# =========
@doc"""
Create a File shard from a file path
"""
def create(path, mime_type) do
%File.Stat{size: size} = File.stat!(path)
mt = MT.create(path)
hash = SData.file_hash(path)
info = %Info{
merkle_root: mt.root,
file_hash: hash,
size: size,
mime_type: mime_type,
}
infobin = SData.term_bin(info)
infohash = SData.bin_hash(infobin)
manifest = %Manifest{infohash: infohash}
pid = Shard.Manager.find_or_start(manifest)
GenServer.cast(pid, {:init_with, path, infobin, mt})
{:ok, manifest, pid}
end
@doc"""
Get info of a file shard including download progress
"""
def get_info(pid) do
GenServer.call(pid, :get_info)
end
end