defmodule SApp.File do @moduledoc""" Shard application for a file identified by its infohash. The file cannot be modified. The infohash is the hash of an info struct containing: %Info{ merkle_root: hash file_hash: hash size: int mime_type: string } The file is cut in blocks of 4kb that are collected in a 64-ary Merkle tree. """ use GenServer require Logger alias SData.MerkleTree, as: MT @concurrent_reqs 10 @req_timeout_msec 2000 defmodule Manifest do @moduledoc""" Manifest for a file. The file is identified by the root hash of its Merkle tree and by its mime type. """ defstruct [:infohash] defimpl Shard.Manifest do def module(_m), do: SApp.File def is_valid?(m) do byte_size(m.infohash) == 32 end end end defmodule Info do @moduledoc""" A file info struct. """ defstruct [:merkle_root, :file_hash, :size, :mime_type] end defmodule State do defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path, :reqs] end def start_link(manifest) do GenServer.start_link(__MODULE__, manifest) end def init(manifest) do %Manifest{infohash: infohash} = manifest id = SData.term_hash manifest Shard.Manager.dispatch_to(id, nil, self()) netgroup = %SNet.PubShardGroup{id: id} SNet.Group.init_lookup(netgroup, self()) path = [Application.get_env(:shard, :data_path), "#{id|>Base.encode16}"] |> Path.join {:ok, store} = SApp.PageStore.start_link(id, :meta, netgroup) {infobin, info} = case Shard.Manager.load_state(id) do nil -> {nil, nil} infobin -> info = SData.term_unbin(infobin) Process.send_after(self(), {:calc_missing, 1}, 100) GenServer.cast(store, {:set_roots, [info.merkle_root]}) {infobin, info} end {:ok, %State{ id: id, infohash: infohash, manifest: manifest, netgroup: netgroup, infobin: infobin, info: info, store: store, missing: nil, path: path, reqs: %{} }} end def handle_call(:manifest, _from, state) do {:reply, state.manifest, state} end def handle_call(:delete_shard, _from, state) do GenServer.call(state.store, :delete_store) File.rm(state.path) {:stop, :normal, :ok, state} end def handle_call(:get_info, _from, state) do reply = cond do state.info != nil and GenServer.call(state.store, {:have_rec, state.info.merkle_root}) -> mt = get_mt(state) nblk = MT.block_count(mt) [infohash: state.infohash, file_hash: state.info.file_hash, size: state.info.size, mime_type: state.info.mime_type, num_blocks: nblk, missing_blocks: if state.missing != nil do map_size(state.missing) else nil end, path: state.path] state.info != nil -> [infohash: state.infohash, file_hash: state.info.file_hash, size: state.info.size, mime_type: state.info.mime_type] true -> [infohash: state.infohash] end {:reply, reply, state} end def handle_cast(:send_deps, state) do GenServer.cast(Shard.Manager, {:dep_list, state.id, []}) {:noreply, state} end def handle_cast({:init_with, file_path, infobin, mt}, state) do true = (SData.bin_hash(infobin) == state.infohash) Shard.Manager.save_state(state.id, infobin) info = SData.term_unbin(infobin) for {k, v} <- mt.store.pages do {^k, _} = SData.PageStore.put(%SApp.PageStore{pid: state.store, prefer_ask: []}, v) end File.copy!(file_path, state.path) new_state = %{state | infobin: infobin, info: info, missing: %{}, reqs: %{}, } {:noreply, new_state} end def handle_cast({:peer_connected, conn_pid}, state) do SNet.Manager.send_pid(conn_pid, {:interested, [state.id]}) {:noreply, state} end def handle_cast({:interested, conn_pid, auth}, state) do if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do cond do state.info == nil -> SNet.Manager.send_pid(conn_pid, {state.id, nil, {:get_info}}) state.missing != nil -> SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}}) SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), true}}) true -> nil end end {:noreply, state} end def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do # Ignore message {:noreply, state} else state = case msg do {:get_info} -> if state.infobin != nil do SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}}) end state {:info, infobin} -> if state.infobin == nil and SData.bin_hash(infobin) == state.infohash do Shard.Manager.save_state(state.id, infobin) state = %{state | infobin: infobin, info: SData.term_unbin(infobin)} GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]}) Process.send_after(self(), {:calc_missing, 1}, 100) state else state end {:missing, missing_list, rep} -> if state.missing != nil do have_list = missing_list |> Enum.filter(&(state.missing[&1] == nil)) SNet.Manager.send_pid(conn_pid, {state.id, nil, {:have, have_list}}) if rep and map_size(state.missing) > 0 do SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), false}}) end end state {:have, have_list} -> peer_info = GenServer.call(conn_pid, :get_peer_info) missing = Enum.reduce(have_list, state.missing, fn id, missing -> if missing[id] != nil and peer_info not in missing[id] do new_have = [peer_info | missing[id]] put_in(missing[id], Enum.take(new_have, 10)) else missing end end) state = %{state | missing: missing} send_reqs(state) {:get, id} -> if state.missing[id] == nil do piece = get_piece(state, id) SNet.Manager.send_pid(conn_pid, {state.id, nil, {:piece, id, piece}}) else SNet.Manager.send_pid(conn_pid, {state.id, nil, {:dont_have, id}}) end state {:piece, id, blk} -> mt = get_mt(state) if state.missing[id] != nil and MT.get(mt, id) == :crypto.hash(:sha256, blk) do state = put_piece(state, id, blk) if map_size(state.missing) == 0 do send(self(), {:calc_missing, 1}) end state = put_in(state.reqs, Map.delete(state.reqs, id)) send_reqs(state) else state end end {:noreply, state} end end def handle_info({:calc_missing, iter}, state) do if state.info != nil do Logger.info(":calc_missing for #{state.id|>Base.encode16}") missing = case GenServer.call(state.store, {:have_rec, state.info.merkle_root}) do true -> meta = get_mt(state) n_blocks = MT.block_count(meta) expected_hashes = MT.get_range(meta, 0..(n_blocks-1)) actual_hashes = if File.exists?(state.path) do File.stream!(state.path, [], MT.block_size()) |> Enum.map(&(:crypto.hash(:sha256, &1))) else [] end missing_1 = Enum.zip([expected_hashes, actual_hashes, 0..n_blocks]) |> Enum.filter(fn {a, b, _} -> a != b end) |> Enum.map(&(elem(&1, 2))) missing_2 = if Enum.count(actual_hashes) < n_blocks do Enum.to_list((Enum.count actual_hashes)..(n_blocks-1)) else [] end missing = for k <- missing_1 ++ missing_2, into: %{} do if state.missing != nil and state.missing[k] != nil do {k, state.missing[k]} else {k, []} end end Logger.info("Missing pieces: #{map_size missing} / #{n_blocks}") missing false -> Logger.info("Incomplete Merkle tree meta data, requesting info from peers.") GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]}) Process.send_after(self(), {:calc_missing, iter + 1}, iter * 1000) nil end state = %{state | missing: missing} state = if missing != nil do SNet.Group.broadcast(state.netgroup, {state.id, nil, {:missing, Map.keys(missing), false}}) send_reqs(state) else state end {:noreply, state} else Logger.info(":calc_missing for #{state.id|>Base.encode16} -> no info") {:noreply, state} end end def handle_info({:req_timeout, id}, state) do if state.reqs[id] != nil do state = put_in(state.reqs, Map.delete(state.reqs, id)) state = send_reqs(state) {:noreply, state} else {:noreply, state} end end defp send_reqs(state) do can_req = for {k, v} <- state.missing, v != [], state.reqs[k] == nil, do: k n_curr_req = map_size(state.reqs) if can_req != [] and n_curr_req < @concurrent_reqs do pieces = can_req |> Enum.sort() |> Enum.take(@concurrent_reqs - n_curr_req) Enum.reduce(pieces, state, fn id, state -> who = a_random_peer(state.missing[id]) Logger.info("#{state.id|>Base.encode16} | Req #{id} to #{inspect who}") SNet.Manager.send(who, {state.id, nil, {:get, id}}) Process.send_after(self(), {:req_timeout, id}, @req_timeout_msec) put_in(state.reqs[id], who) end) else state end end defp get_piece(state, id) do nil = state.missing[id] fh = File.open!(state.path, [:read, :binary]) {:ok, blk} = :file.pread(fh, id * MT.block_size(), MT.block_size()) :file.close(fh) blk end defp put_piece(state, id, blk) do mt = get_mt(state) true = (MT.get(mt, id) == :crypto.hash(:sha256, blk)) fh = File.open!(state.path, [:read, :write, :binary]) :ok = :file.pwrite(fh, id * MT.block_size(), blk) :file.close(fh) %{state | missing: Map.delete(state.missing, id), reqs: Map.delete(state.reqs, id)} end defp a_random_peer(list) do case list do [a] -> a [a|r] -> if :rand.uniform() < 0.3 do a else a_random_peer(r) end end end defp get_mt(state) do %MT{root: state.info.merkle_root, store: %SApp.PageStore{pid: state.store, prefer_ask: []}} end # ========= # INTERFACE # ========= @doc""" Create a File shard from a file path """ def create(path, mime_type) do %File.Stat{size: size} = File.stat!(path) mt = MT.create(path) hash = SData.file_hash(path) info = %Info{ merkle_root: mt.root, file_hash: hash, size: size, mime_type: mime_type, } infobin = SData.term_bin(info) infohash = SData.bin_hash(infobin) manifest = %Manifest{infohash: infohash} pid = Shard.Manager.find_or_start(manifest) GenServer.cast(pid, {:init_with, path, infobin, mt}) {:ok, manifest, pid} end @doc""" Get info of a file shard including download progress """ def get_info(pid) do GenServer.call(pid, :get_info) end end