aboutsummaryrefslogblamecommitdiff
path: root/shard/lib/app/file.ex
blob: ce28bebdd996f2b5bcc3707a2328ba195945aca3 (plain) (tree)





















                                                                                       


                        






















                                                                                    
                                                                                                     








                                              
                                              
 



                                                                                         

                                                                 









                                                               

                                                                         
                                                                                     


      













                                                                                               

                                                                                          















                                                            
                                                                 

                                                      
                                    

                                                                                         




                                     

                   



                         
























                                                                                                     







































































































                                                                                                                                   
                                                                            





















                                                                                 
                                                                               









































                                                                                               























                                                        
                        
     






                                                      
   
defmodule SApp.File do
  @moduledoc"""
  Shard application for a file identified by its infohash. The file cannot be modified.

  The infohash is the hash of an info struct containing:

      %Info{
        merkle_root: hash
        file_hash: hash
        size: int
        mime_type: string
    }

  The file is cut in blocks of 4kb that are collected in a 64-ary Merkle tree.
  """

  use GenServer

  require Logger

  alias SData.MerkleTree, as: MT

  @concurrent_reqs 10
  @req_timeout_msec 2000

  defmodule Manifest do
    @moduledoc"""
    Manifest for a file.
    The file is identified by the root hash of its Merkle tree and by its mime type.
    """
    defstruct [:infohash]

    defimpl Shard.Manifest do
      def module(_m), do: SApp.File
      def is_valid?(m) do
        byte_size(m.infohash) == 32
      end
    end
  end

  defmodule Info do
    @moduledoc"""
    A file info struct.
    """
    defstruct [:merkle_root, :file_hash, :size, :mime_type]
  end

  defmodule State do
    defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path, :reqs]
  end

  def start_link(manifest) do
    GenServer.start_link(__MODULE__, manifest)
  end  

  def init(manifest) do
    %Manifest{infohash: infohash} = manifest
    id = SData.term_hash manifest
    Shard.Manager.dispatch_to(id, nil, self())

    netgroup = %SNet.PubShardGroup{id: id}
    SNet.Group.init_lookup(netgroup, self())

    path = [Application.get_env(:shard, :data_path), "#{id|>Base.encode16}"] |> Path.join
    {:ok, store} = SApp.PageStore.start_link(id, :meta, netgroup)

    {infobin, info} = case Shard.Manager.load_state(id) do
      nil ->
        {nil, nil}
      infobin ->
        info = SData.term_unbin(infobin)
        Process.send_after(self(), {:calc_missing, 1}, 100)
        GenServer.cast(store, {:set_roots, [info.merkle_root]})
        {infobin, info}
    end

    {:ok, %State{
      id: id, infohash: infohash, manifest: manifest, netgroup: netgroup,
      infobin: infobin, info: info, store: store, missing: nil, path: path, reqs: %{}
    }}
  end

  def handle_call(:manifest, _from, state) do
    {:reply, state.manifest, state}
  end

  def handle_call(:get_info, _from, state) do
    reply = cond do
      state.info != nil and GenServer.call(state.store, {:have_rec, state.info.merkle_root}) ->
        mt = get_mt(state)
        nblk = MT.block_count(mt)
        [infohash: state.infohash,
          file_hash: state.info.file_hash,
          size: state.info.size,
          mime_type: state.info.mime_type,
          num_blocks: nblk,
          missing_blocks: if state.missing != nil do map_size(state.missing) else nil end,
          path: state.path]
      state.info != nil ->
        [infohash: state.infohash,
         file_hash: state.info.file_hash,
          size: state.info.size,
          mime_type: state.info.mime_type]
      true ->
        [infohash: state.infohash]
    end
    {:reply, reply, state}
  end

  def handle_cast(:send_deps, state) do
    GenServer.cast(Shard.Manager, {:dep_list, state.id, []})
    {:noreply, state}
  end

  def handle_cast({:init_with, file_path, infobin, mt}, state) do
    true = (SData.bin_hash(infobin) == state.infohash)
    Shard.Manager.save_state(state.id, infobin)
    info = SData.term_unbin(infobin)
    for {k, v} <- mt.store.pages do
      {^k, _} = SData.PageStore.put(%SApp.PageStore{pid: state.store, prefer_ask: []}, v)
    end
    File.copy!(file_path, state.path)
    new_state = %{state |
      infobin: infobin,
      info: info,
      missing: %{},
      reqs: %{},
    }
    {:noreply, new_state}
  end

  def handle_cast({:peer_connected, conn_pid}, state) do
    SNet.Manager.send_pid(conn_pid, {:interested, [state.id]})
    {:noreply, state}
  end

  def handle_cast({:interested, conn_pid, auth}, state) do
    if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
      cond do
        state.info == nil ->
          SNet.Manager.send_pid(conn_pid, {state.id, nil, {:get_info}})
        state.missing != nil ->
          SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}})
          SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), true}})
        true ->
          nil
      end
    end
    {:noreply, state}
  end

  def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
    if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
      # Ignore message
      {:noreply, state}
    else
      state = case msg do
        {:get_info} ->
          if state.infobin != nil do
            SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}})
          end
          state
        {:info, infobin} ->
          if state.infobin == nil and SData.bin_hash(infobin) == state.infohash do
            Shard.Manager.save_state(state.id, infobin)
            state = %{state | infobin: infobin, info: SData.term_unbin(infobin)}
            GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]})
            Process.send_after(self(), {:calc_missing, 1}, 100)
            state
          else
            state
          end
        {:missing, missing_list, rep} ->
          if state.missing != nil do
            have_list = missing_list |> Enum.filter(&(state.missing[&1] == nil))
            SNet.Manager.send_pid(conn_pid, {state.id, nil, {:have, have_list}})
            if rep and map_size(state.missing) > 0 do
              SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), false}})
            end
          end
          state
        {:have, have_list} ->
          peer_info = GenServer.call(conn_pid, :get_peer_info)
          missing = Enum.reduce(have_list, state.missing, fn id, missing ->
            if missing[id] != nil and peer_info not in missing[id] do
              new_have = [peer_info | missing[id]]
              put_in(missing[id], Enum.take(new_have, 10))
            else
              missing
            end
          end)
          state = %{state | missing: missing}
          send_reqs(state)
        {:get, id} ->
          if state.missing[id] == nil do
            piece = get_piece(state, id)
            SNet.Manager.send_pid(conn_pid, {state.id, nil, {:piece, id, piece}})
          else
            SNet.Manager.send_pid(conn_pid, {state.id, nil, {:dont_have, id}})
          end
          state
        {:piece, id, blk} ->
          mt = get_mt(state)
          if state.missing[id] != nil and MT.get(mt, id) == :crypto.hash(:sha256, blk) do
            state = put_piece(state, id, blk)
            if map_size(state.missing) == 0 do
              send(self(), {:calc_missing, 1})
            end
            state = put_in(state.reqs, Map.delete(state.reqs, id))
            send_reqs(state)
          else
            state
          end
      end
      {:noreply, state}
    end
  end

  def handle_info({:calc_missing, iter}, state) do
    if state.info != nil do
      Logger.info(":calc_missing for #{state.id|>Base.encode16}")
      missing = case GenServer.call(state.store, {:have_rec, state.info.merkle_root}) do
        true ->
          meta = get_mt(state)
          n_blocks = MT.block_count(meta)
          expected_hashes = MT.get_range(meta, 0..(n_blocks-1))
          actual_hashes = if File.exists?(state.path) do
            File.stream!(state.path, [], MT.block_size())
            |> Enum.map(&(:crypto.hash(:sha256, &1)))
          else
            []
          end
          missing_1 = Enum.zip([expected_hashes, actual_hashes, 0..n_blocks])
                      |> Enum.filter(fn {a, b, _} -> a != b end)
                      |> Enum.map(&(elem(&1, 2)))
          missing_2 = if Enum.count(actual_hashes) < n_blocks do Enum.to_list((Enum.count actual_hashes)..(n_blocks-1)) else [] end
          missing = for k <- missing_1 ++ missing_2, into: %{} do
            if state.missing != nil and state.missing[k] != nil do
              {k, state.missing[k]}
            else
              {k, []}
            end
          end
          Logger.info("Missing pieces: #{map_size missing} / #{n_blocks}")
          missing
        false ->
          Logger.info("Incomplete Merkle tree meta data, requesting info from peers.")
          GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]})
          Process.send_after(self(), {:calc_missing, iter + 1}, iter * 1000)
          nil
      end
      state = %{state | missing: missing}
      state = if missing != nil do
        SNet.Group.broadcast(state.netgroup, {state.id, nil, {:missing, Map.keys(missing), false}})
        send_reqs(state)
      else
        state
      end
      {:noreply, state}
    else
      Logger.info(":calc_missing for #{state.id|>Base.encode16} -> no info")
      {:noreply, state}
    end
  end

  def handle_info({:req_timeout, id}, state) do
    if state.reqs[id] != nil do
      state = put_in(state.reqs, Map.delete(state.reqs, id))
      state = send_reqs(state)
      {:noreply, state}
    else
      {:noreply, state}
    end
  end

  defp send_reqs(state) do
    can_req = for {k, v} <- state.missing, v != [], state.reqs[k] == nil, do: k
    n_curr_req = map_size(state.reqs)

    if can_req != [] and n_curr_req < @concurrent_reqs do
      pieces = can_req |> Enum.sort() |> Enum.take(@concurrent_reqs - n_curr_req)
      Enum.reduce(pieces, state, fn id, state ->
        who = a_random_peer(state.missing[id])
        Logger.info("#{state.id|>Base.encode16} | Req #{id} to #{inspect who}")
        SNet.Manager.send(who, {state.id, nil, {:get, id}})
        Process.send_after(self(), {:req_timeout, id}, @req_timeout_msec)
        put_in(state.reqs[id], who)
      end)
    else
      state
    end
  end

  defp get_piece(state, id) do
    nil = state.missing[id]
    fh = File.open!(state.path, [:read, :binary])
    {:ok, blk} = :file.pread(fh, id * MT.block_size(), MT.block_size())
    :file.close(fh)
    blk
  end

  defp put_piece(state, id, blk) do
    mt = get_mt(state)
    true = (MT.get(mt, id) == :crypto.hash(:sha256, blk))
    fh = File.open!(state.path, [:read, :write, :binary])
    :ok = :file.pwrite(fh, id * MT.block_size(), blk)
    :file.close(fh)
    %{state | missing: Map.delete(state.missing, id),
              reqs: Map.delete(state.reqs, id)}
  end

  defp a_random_peer(list) do
    case list do
      [a] -> a
      [a|r] ->
        if :rand.uniform() < 0.3 do
          a
        else
          a_random_peer(r)
        end
    end
  end

  defp get_mt(state) do
    %MT{root: state.info.merkle_root, store: %SApp.PageStore{pid: state.store, prefer_ask: []}}
  end

  # =========
  # INTERFACE
  # =========
  
  @doc"""
  Create a File shard from a file path
  """
  def create(path, mime_type) do
    %File.Stat{size: size} = File.stat!(path)
    mt = MT.create(path)
    hash = SData.file_hash(path)

    info = %Info{
      merkle_root: mt.root,
      file_hash: hash,
      size: size,
      mime_type: mime_type,
    }
    infobin = SData.term_bin(info)
    infohash = SData.bin_hash(infobin)
    manifest = %Manifest{infohash: infohash}
    pid = Shard.Manager.find_or_start(manifest)
    GenServer.cast(pid, {:init_with, path, infobin, mt})
    {:ok, manifest, pid}
  end

  @doc"""
  Get info of a file shard including download progress
  """
  def get_info(pid) do
    GenServer.call(pid, :get_info)
  end
end