aboutsummaryrefslogtreecommitdiff
path: root/shard/lib/app/file.ex
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2018-11-05 15:03:31 +0100
committerAlex Auvolat <alex@adnab.me>2018-11-05 15:03:31 +0100
commit72906c6bb473ea605235c84b6d01c318f7b6cef8 (patch)
tree904bb90dea4642ccce55a20145a2349f4f7aaf25 /shard/lib/app/file.ex
parenta26dd9284352000cca6338b68c03594dcd900494 (diff)
downloadshard-72906c6bb473ea605235c84b6d01c318f7b6cef8.tar.gz
shard-72906c6bb473ea605235c84b6d01c318f7b6cef8.zip
File shard quite complete (but not perfect)
Diffstat (limited to 'shard/lib/app/file.ex')
-rw-r--r--shard/lib/app/file.ex261
1 files changed, 250 insertions, 11 deletions
diff --git a/shard/lib/app/file.ex b/shard/lib/app/file.ex
index a874222..b28f742 100644
--- a/shard/lib/app/file.ex
+++ b/shard/lib/app/file.ex
@@ -20,6 +20,9 @@ defmodule SApp.File do
alias SData.MerkleTree, as: MT
+ @concurrent_reqs 10
+ @req_timeout_msec 2000
+
defmodule Manifest do
@moduledoc"""
Manifest for a file.
@@ -43,7 +46,7 @@ defmodule SApp.File do
end
defmodule State do
- defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path]
+ defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path, :reqs]
end
def start_link(manifest) do
@@ -53,39 +56,274 @@ defmodule SApp.File do
def init(manifest) do
%Manifest{infohash: infohash} = manifest
id = SData.term_hash manifest
-
Shard.Manager.dispatch_to(id, nil, self())
- {infobin, info} = case Shard.Manager.load_state(id) do
- nil -> {nil, nil}
- infobin -> {infobin, SData.term_unbin infobin}
- end
+
netgroup = %SNet.PubShardGroup{id: id}
SNet.Group.init_lookup(netgroup, self())
path = [Application.get_env(:shard, :data_path), "#{id|>Base.encode16}"] |> Path.join
-
{:ok, store} = SApp.PageStore.start_link(id, :meta, netgroup)
+ {infobin, info} = case Shard.Manager.load_state(id) do
+ nil ->
+ {nil, nil}
+ infobin ->
+ info = SData.term_unbin(infobin)
+ Process.send_after(self(), {:calc_missing, 1}, 100)
+ GenServer.cast(store, {:set_roots, [info.merkle_root]})
+ {infobin, info}
+ end
+
{:ok, %State{
id: id, infohash: infohash, manifest: manifest, netgroup: netgroup,
- infobin: infobin, info: info, store: store, missing: nil, path: path
+ infobin: infobin, info: info, store: store, missing: nil, path: path, reqs: %{}
}}
end
+ def handle_call(:manifest, _from, state) do
+ {:reply, state.manifest, state}
+ end
+
+ def handle_call(:get_info, _from, state) do
+ reply = cond do
+ state.info != nil and GenServer.call(state.store, {:have_rec, state.info.merkle_root}) ->
+ mt = get_mt(state)
+ nblk = MT.block_count(mt)
+ [infohash: state.infohash,
+ file_hash: state.info.file_hash,
+ size: state.info.size,
+ mime_type: state.info.mime_type,
+ num_blocks: nblk,
+ missing_blocks: if state.missing != nil do map_size(state.missing) else nil end]
+ state.info != nil ->
+ [infohash: state.infohash,
+ file_hash: state.info.file_hash,
+ size: state.info.size,
+ mime_type: state.info.mime_type]
+ true ->
+ [infohash: state.infohash]
+ end
+ {:reply, reply, state}
+ end
+
+ def handle_cast(:send_deps, state) do
+ GenServer.cast(Shard.Manager, {:dep_list, state.id, []})
+ {:noreply, state}
+ end
+
def handle_cast({:init_with, file_path, infobin, mt}, state) do
+ true = (SData.bin_hash(infobin) == state.infohash)
+ Shard.Manager.save_state(state.id, infobin)
info = SData.term_unbin(infobin)
- for {k, v} <- mt.store do
- {^k, _} = SData.PageStore.put(state.store, v)
+ for {k, v} <- mt.store.pages do
+ {^k, _} = SData.PageStore.put(%SApp.PageStore{pid: state.store, prefer_ask: []}, v)
end
File.copy!(file_path, state.path)
new_state = %{state |
infobin: infobin,
info: info,
+ missing: %{},
+ reqs: %{},
}
{:noreply, new_state}
end
- # TODO networking etc
+ def handle_cast({:peer_connected, conn_pid}, state) do
+ SNet.Manager.send_pid(conn_pid, {:interested, [state.id]})
+ {:noreply, state}
+ end
+
+ def handle_cast({:interested, conn_pid, auth}, state) do
+ if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
+ cond do
+ state.info == nil ->
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:get_info}})
+ state.missing != nil ->
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}})
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), true}})
+ true ->
+ nil
+ end
+ end
+ {:noreply, state}
+ end
+
+ def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do
+ if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do
+ # Ignore message
+ {:noreply, state}
+ else
+ Logger.info("<- #{inspect msg}")
+ state = case msg do
+ {:get_info} ->
+ if state.infobin != nil do
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}})
+ end
+ state
+ {:info, infobin} ->
+ if state.infobin == nil and SData.bin_hash(infobin) == state.infohash do
+ Shard.Manager.save_state(state.id, infobin)
+ state = %{state | infobin: infobin, info: SData.term_unbin(infobin)}
+ GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]})
+ Process.send_after(self(), {:calc_missing, 1}, 100)
+ state
+ else
+ state
+ end
+ {:missing, missing_list, rep} ->
+ if state.missing != nil do
+ have_list = missing_list |> Enum.filter(&(state.missing[&1] == nil))
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:have, have_list}})
+ if rep and map_size(state.missing) > 0 do
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), false}})
+ end
+ end
+ state
+ {:have, have_list} ->
+ peer_info = GenServer.call(conn_pid, :get_peer_info)
+ missing = Enum.reduce(have_list, state.missing, fn id, missing ->
+ if missing[id] != nil and peer_info not in missing[id] do
+ new_have = [peer_info | missing[id]]
+ put_in(missing[id], Enum.take(new_have, 10))
+ else
+ missing
+ end
+ end)
+ state = %{state | missing: missing}
+ send_reqs(state)
+ {:get, id} ->
+ if state.missing[id] == nil do
+ piece = get_piece(state, id)
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:piece, id, piece}})
+ else
+ SNet.Manager.send_pid(conn_pid, {state.id, nil, {:dont_have, id}})
+ end
+ state
+ {:piece, id, blk} ->
+ mt = get_mt(state)
+ if state.missing[id] != nil and MT.get(mt, id) == :crypto.hash(:sha256, blk) do
+ state = put_piece(state, id, blk)
+ if map_size(state.missing) == 0 do
+ send(self(), {:calc_missing, 1})
+ end
+ state = put_in(state.reqs, Map.delete(state.reqs, id))
+ send_reqs(state)
+ else
+ state
+ end
+ end
+ {:noreply, state}
+ end
+ end
+
+ def handle_info({:calc_missing, iter}, state) do
+ if state.info != nil do
+ Logger.info(":calc_missing for #{state.id|>Base.encode16}")
+ missing = case GenServer.call(state.store, {:have_rec, state.info.merkle_root}) do
+ true ->
+ meta = get_mt(state)
+ n_blocks = MT.block_count(meta)
+ expected_hashes = MT.get_range(meta, 0..(n_blocks-1))
+ actual_hashes = if File.exists?(state.path) do
+ File.stream!(state.path, [], MT.block_size())
+ |> Enum.map(&(:crypto.hash(:sha256, &1)))
+ else
+ []
+ end
+ missing_1 = Enum.zip([expected_hashes, actual_hashes, 0..n_blocks])
+ |> Enum.filter(fn {a, b, _} -> a != b end)
+ |> Enum.map(&(elem(&1, 2)))
+ missing_2 = if Enum.count(actual_hashes) < n_blocks do Enum.to_list((Enum.count actual_hashes)..(n_blocks-1)) else [] end
+ missing = for k <- missing_1 ++ missing_2, into: %{} do
+ if state.missing != nil and state.missing[k] != nil do
+ {k, state.missing[k]}
+ else
+ {k, []}
+ end
+ end
+ Logger.info("Missing pieces: #{map_size missing} / #{n_blocks}")
+ missing
+ false ->
+ Logger.info("Incomplete Merkle tree meta data, requesting info from peers.")
+ GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]})
+ Process.send_after(self(), {:calc_missing, iter + 1}, iter * 1000)
+ nil
+ end
+ state = %{state | missing: missing}
+ state = if missing != nil do
+ SNet.Group.broadcast(state.netgroup, {state.id, nil, {:missing, Map.keys(missing), false}})
+ send_reqs(state)
+ else
+ state
+ end
+ {:noreply, state}
+ else
+ Logger.info(":calc_missing for #{state.infohash|>Base.encode16} -> no info")
+ {:noreply, state}
+ end
+ end
+
+ def handle_info({:req_timeout, id}, state) do
+ if state.reqs[id] != nil do
+ state = put_in(state.reqs, Map.delete(state.reqs, id))
+ state = send_reqs(state)
+ {:noreply, state}
+ else
+ {:noreply, state}
+ end
+ end
+
+ defp send_reqs(state) do
+ can_req = for {k, v} <- state.missing, v != [], state.reqs[k] == nil, do: k
+ n_curr_req = map_size(state.reqs)
+
+ if can_req != [] and n_curr_req < @concurrent_reqs do
+ pieces = can_req |> Enum.sort() |> Enum.take(@concurrent_reqs - n_curr_req)
+ Enum.reduce(pieces, state, fn id, state ->
+ who = a_random_peer(state.missing[id])
+ Logger.info("Req #{id} to #{inspect who}")
+ SNet.Manager.send(who, {state.id, nil, {:get, id}})
+ Process.send_after(self(), {:req_timeout, id}, @req_timeout_msec)
+ put_in(state.reqs[id], who)
+ end)
+ else
+ state
+ end
+ end
+
+ defp get_piece(state, id) do
+ nil = state.missing[id]
+ fh = File.open!(state.path, [:read, :binary])
+ {:ok, blk} = :file.pread(fh, id * MT.block_size(), MT.block_size())
+ :file.close(fh)
+ blk
+ end
+
+ defp put_piece(state, id, blk) do
+ mt = get_mt(state)
+ true = (MT.get(mt, id) == :crypto.hash(:sha256, blk))
+ fh = File.open!(state.path, [:read, :write, :binary])
+ :ok = :file.pwrite(fh, id * MT.block_size(), blk)
+ :file.close(fh)
+ %{state | missing: Map.delete(state.missing, id),
+ reqs: Map.delete(state.reqs, id)}
+ end
+
+ defp a_random_peer(list) do
+ case list do
+ [a] -> a
+ [a|r] ->
+ if :rand.uniform() < 0.3 do
+ a
+ else
+ a_random_peer(r)
+ end
+ end
+ end
+
+ defp get_mt(state) do
+ %MT{root: state.info.merkle_root, store: %SApp.PageStore{pid: state.store, prefer_ask: []}}
+ end
# =========
# INTERFACE
@@ -110,5 +348,6 @@ defmodule SApp.File do
manifest = %Manifest{infohash: infohash}
pid = Shard.Manager.find_or_start(manifest)
GenServer.cast(pid, {:init_with, path, infobin, mt})
+ {:ok, manifest, pid}
end
end