From 72906c6bb473ea605235c84b6d01c318f7b6cef8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Nov 2018 15:03:31 +0100 Subject: File shard quite complete (but not perfect) --- shard/lib/app/chat.ex | 3 - shard/lib/app/directory.ex | 7 +- shard/lib/app/file.ex | 261 +++++++++++++++++++++++++++++++++++++++++-- shard/lib/data/merkletree.ex | 11 +- shard/lib/shard_uri.ex | 46 ++++++++ 5 files changed, 310 insertions(+), 18 deletions(-) create mode 100644 shard/lib/shard_uri.ex (limited to 'shard/lib') diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index 405210b..2795153 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -226,9 +226,6 @@ defmodule SApp.Chat do {:noreply, state} else state = case msg do - {:get_manifest} -> - SNet.Manager.send_pid(conn_pid, {state.id, nil, {:manifest, state.manifest}}) - state {:append, prev_root, msgitem, new_root} -> # Append message: one single mesage has arrived if new_root == state.mst.root do diff --git a/shard/lib/app/directory.ex b/shard/lib/app/directory.ex index 96e3a2b..e9482f7 100644 --- a/shard/lib/app/directory.ex +++ b/shard/lib/app/directory.ex @@ -145,6 +145,11 @@ defmodule SApp.Directory do {:noreply, state} end + def handle_cast({:peer_connected, peer_pid}, state) do + SNet.Manager.send_pid(peer_pid, {:interested, [state.id]}) + {:noreply, state} + end + def handle_cast({:interested, peer_pid, auth}, state) do if SNet.Group.in_group?(state.netgroup, peer_pid, auth) do SNet.Manager.send_pid(peer_pid, {state.id, nil, {:update, SData.SignRev.signed(state.items), true}}) @@ -197,9 +202,7 @@ defmodule SApp.Directory do defp send_deps(state) do dict = SData.SignRev.get(state.items) - IO.puts("items: #{inspect dict}") deps = for {_, {m, stored}} <- dict, stored, do: m - IO.puts("stored: #{inspect deps}") GenServer.cast(Shard.Manager, {:dep_list, state.id, deps}) end diff --git a/shard/lib/app/file.ex b/shard/lib/app/file.ex index a874222..b28f742 100644 --- a/shard/lib/app/file.ex +++ b/shard/lib/app/file.ex @@ -20,6 +20,9 @@ defmodule SApp.File do alias SData.MerkleTree, as: MT + @concurrent_reqs 10 + @req_timeout_msec 2000 + defmodule Manifest do @moduledoc""" Manifest for a file. @@ -43,7 +46,7 @@ defmodule SApp.File do end defmodule State do - defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path] + defstruct [:infohash, :id, :manifest, :netgroup, :info, :infobin, :store, :missing, :path, :reqs] end def start_link(manifest) do @@ -53,39 +56,274 @@ defmodule SApp.File do def init(manifest) do %Manifest{infohash: infohash} = manifest id = SData.term_hash manifest - Shard.Manager.dispatch_to(id, nil, self()) - {infobin, info} = case Shard.Manager.load_state(id) do - nil -> {nil, nil} - infobin -> {infobin, SData.term_unbin infobin} - end + netgroup = %SNet.PubShardGroup{id: id} SNet.Group.init_lookup(netgroup, self()) path = [Application.get_env(:shard, :data_path), "#{id|>Base.encode16}"] |> Path.join - {:ok, store} = SApp.PageStore.start_link(id, :meta, netgroup) + {infobin, info} = case Shard.Manager.load_state(id) do + nil -> + {nil, nil} + infobin -> + info = SData.term_unbin(infobin) + Process.send_after(self(), {:calc_missing, 1}, 100) + GenServer.cast(store, {:set_roots, [info.merkle_root]}) + {infobin, info} + end + {:ok, %State{ id: id, infohash: infohash, manifest: manifest, netgroup: netgroup, - infobin: infobin, info: info, store: store, missing: nil, path: path + infobin: infobin, info: info, store: store, missing: nil, path: path, reqs: %{} }} end + def handle_call(:manifest, _from, state) do + {:reply, state.manifest, state} + end + + def handle_call(:get_info, _from, state) do + reply = cond do + state.info != nil and GenServer.call(state.store, {:have_rec, state.info.merkle_root}) -> + mt = get_mt(state) + nblk = MT.block_count(mt) + [infohash: state.infohash, + file_hash: state.info.file_hash, + size: state.info.size, + mime_type: state.info.mime_type, + num_blocks: nblk, + missing_blocks: if state.missing != nil do map_size(state.missing) else nil end] + state.info != nil -> + [infohash: state.infohash, + file_hash: state.info.file_hash, + size: state.info.size, + mime_type: state.info.mime_type] + true -> + [infohash: state.infohash] + end + {:reply, reply, state} + end + + def handle_cast(:send_deps, state) do + GenServer.cast(Shard.Manager, {:dep_list, state.id, []}) + {:noreply, state} + end + def handle_cast({:init_with, file_path, infobin, mt}, state) do + true = (SData.bin_hash(infobin) == state.infohash) + Shard.Manager.save_state(state.id, infobin) info = SData.term_unbin(infobin) - for {k, v} <- mt.store do - {^k, _} = SData.PageStore.put(state.store, v) + for {k, v} <- mt.store.pages do + {^k, _} = SData.PageStore.put(%SApp.PageStore{pid: state.store, prefer_ask: []}, v) end File.copy!(file_path, state.path) new_state = %{state | infobin: infobin, info: info, + missing: %{}, + reqs: %{}, } {:noreply, new_state} end - # TODO networking etc + def handle_cast({:peer_connected, conn_pid}, state) do + SNet.Manager.send_pid(conn_pid, {:interested, [state.id]}) + {:noreply, state} + end + + def handle_cast({:interested, conn_pid, auth}, state) do + if SNet.Group.in_group?(state.netgroup, conn_pid, auth) do + cond do + state.info == nil -> + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:get_info}}) + state.missing != nil -> + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}}) + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), true}}) + true -> + nil + end + end + {:noreply, state} + end + + def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do + if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do + # Ignore message + {:noreply, state} + else + Logger.info("<- #{inspect msg}") + state = case msg do + {:get_info} -> + if state.infobin != nil do + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:info, state.infobin}}) + end + state + {:info, infobin} -> + if state.infobin == nil and SData.bin_hash(infobin) == state.infohash do + Shard.Manager.save_state(state.id, infobin) + state = %{state | infobin: infobin, info: SData.term_unbin(infobin)} + GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]}) + Process.send_after(self(), {:calc_missing, 1}, 100) + state + else + state + end + {:missing, missing_list, rep} -> + if state.missing != nil do + have_list = missing_list |> Enum.filter(&(state.missing[&1] == nil)) + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:have, have_list}}) + if rep and map_size(state.missing) > 0 do + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:missing, Map.keys(state.missing), false}}) + end + end + state + {:have, have_list} -> + peer_info = GenServer.call(conn_pid, :get_peer_info) + missing = Enum.reduce(have_list, state.missing, fn id, missing -> + if missing[id] != nil and peer_info not in missing[id] do + new_have = [peer_info | missing[id]] + put_in(missing[id], Enum.take(new_have, 10)) + else + missing + end + end) + state = %{state | missing: missing} + send_reqs(state) + {:get, id} -> + if state.missing[id] == nil do + piece = get_piece(state, id) + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:piece, id, piece}}) + else + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:dont_have, id}}) + end + state + {:piece, id, blk} -> + mt = get_mt(state) + if state.missing[id] != nil and MT.get(mt, id) == :crypto.hash(:sha256, blk) do + state = put_piece(state, id, blk) + if map_size(state.missing) == 0 do + send(self(), {:calc_missing, 1}) + end + state = put_in(state.reqs, Map.delete(state.reqs, id)) + send_reqs(state) + else + state + end + end + {:noreply, state} + end + end + + def handle_info({:calc_missing, iter}, state) do + if state.info != nil do + Logger.info(":calc_missing for #{state.id|>Base.encode16}") + missing = case GenServer.call(state.store, {:have_rec, state.info.merkle_root}) do + true -> + meta = get_mt(state) + n_blocks = MT.block_count(meta) + expected_hashes = MT.get_range(meta, 0..(n_blocks-1)) + actual_hashes = if File.exists?(state.path) do + File.stream!(state.path, [], MT.block_size()) + |> Enum.map(&(:crypto.hash(:sha256, &1))) + else + [] + end + missing_1 = Enum.zip([expected_hashes, actual_hashes, 0..n_blocks]) + |> Enum.filter(fn {a, b, _} -> a != b end) + |> Enum.map(&(elem(&1, 2))) + missing_2 = if Enum.count(actual_hashes) < n_blocks do Enum.to_list((Enum.count actual_hashes)..(n_blocks-1)) else [] end + missing = for k <- missing_1 ++ missing_2, into: %{} do + if state.missing != nil and state.missing[k] != nil do + {k, state.missing[k]} + else + {k, []} + end + end + Logger.info("Missing pieces: #{map_size missing} / #{n_blocks}") + missing + false -> + Logger.info("Incomplete Merkle tree meta data, requesting info from peers.") + GenServer.cast(state.store, {:set_roots, [state.info.merkle_root]}) + Process.send_after(self(), {:calc_missing, iter + 1}, iter * 1000) + nil + end + state = %{state | missing: missing} + state = if missing != nil do + SNet.Group.broadcast(state.netgroup, {state.id, nil, {:missing, Map.keys(missing), false}}) + send_reqs(state) + else + state + end + {:noreply, state} + else + Logger.info(":calc_missing for #{state.infohash|>Base.encode16} -> no info") + {:noreply, state} + end + end + + def handle_info({:req_timeout, id}, state) do + if state.reqs[id] != nil do + state = put_in(state.reqs, Map.delete(state.reqs, id)) + state = send_reqs(state) + {:noreply, state} + else + {:noreply, state} + end + end + + defp send_reqs(state) do + can_req = for {k, v} <- state.missing, v != [], state.reqs[k] == nil, do: k + n_curr_req = map_size(state.reqs) + + if can_req != [] and n_curr_req < @concurrent_reqs do + pieces = can_req |> Enum.sort() |> Enum.take(@concurrent_reqs - n_curr_req) + Enum.reduce(pieces, state, fn id, state -> + who = a_random_peer(state.missing[id]) + Logger.info("Req #{id} to #{inspect who}") + SNet.Manager.send(who, {state.id, nil, {:get, id}}) + Process.send_after(self(), {:req_timeout, id}, @req_timeout_msec) + put_in(state.reqs[id], who) + end) + else + state + end + end + + defp get_piece(state, id) do + nil = state.missing[id] + fh = File.open!(state.path, [:read, :binary]) + {:ok, blk} = :file.pread(fh, id * MT.block_size(), MT.block_size()) + :file.close(fh) + blk + end + + defp put_piece(state, id, blk) do + mt = get_mt(state) + true = (MT.get(mt, id) == :crypto.hash(:sha256, blk)) + fh = File.open!(state.path, [:read, :write, :binary]) + :ok = :file.pwrite(fh, id * MT.block_size(), blk) + :file.close(fh) + %{state | missing: Map.delete(state.missing, id), + reqs: Map.delete(state.reqs, id)} + end + + defp a_random_peer(list) do + case list do + [a] -> a + [a|r] -> + if :rand.uniform() < 0.3 do + a + else + a_random_peer(r) + end + end + end + + defp get_mt(state) do + %MT{root: state.info.merkle_root, store: %SApp.PageStore{pid: state.store, prefer_ask: []}} + end # ========= # INTERFACE @@ -110,5 +348,6 @@ defmodule SApp.File do manifest = %Manifest{infohash: infohash} pid = Shard.Manager.find_or_start(manifest) GenServer.cast(pid, {:init_with, path, infobin, mt}) + {:ok, manifest, pid} end end diff --git a/shard/lib/data/merkletree.ex b/shard/lib/data/merkletree.ex index 90361a3..73679cf 100644 --- a/shard/lib/data/merkletree.ex +++ b/shard/lib/data/merkletree.ex @@ -5,8 +5,8 @@ defmodule SData.MerkleTree do alias SData.PageStore, as: Store - @block_size 4096 - @tree_arity 64 + @block_size 8192 + @tree_arity 256 defstruct [:root, :store] @@ -24,6 +24,13 @@ defmodule SData.MerkleTree do end end + @doc""" + Get the block size used by merkle trees + """ + def block_size() do + @block_size + end + @doc""" Create a Merkle tree for indexing a file. """ diff --git a/shard/lib/shard_uri.ex b/shard/lib/shard_uri.ex new file mode 100644 index 0000000..1b186d2 --- /dev/null +++ b/shard/lib/shard_uri.ex @@ -0,0 +1,46 @@ +defmodule ShardURI do + @moduledoc""" + Convert Shard manifests to and from text strings. + """ + + def from_manifest(m) do + case m do + %SApp.Chat.Manifest{channel: chan} -> "shard:chat:#{chan}" + %SApp.Chat.PrivChat.Manifest{pk_list: pk_list} -> + "shard:privchat:#{pk_list|>Enum.map(&Base.encode16/1)|>Enum.join(",")}" + %SApp.Identity.Manifest{pk: pk} -> + "shard:identity:#{pk|>Base.encode16}" + %SApp.Directory.Manifest{owner: owner, public: true, name: name} -> + "shard:dir:pub:#{owner|>Base.encode16}:#{name}" + %SApp.Directory.Manifest{owner: owner, public: false, name: name} -> + "shard:dir:priv:#{owner|>Base.encode16}:#{name}" + %SApp.File.Manifest{infohash: infohash} -> + "shard:file:#{infohash|>Base.encode16}" + end + end + + def to_manifest(p) do + case p do + "shard:chat:" <> chan -> + %SApp.Chat.Manifest{channel: chan} + "shard:privchat:" <> pklist -> + pklist + |> String.split(",") + |> Enum.map(&parse_pk/1) + |> SApp.Chat.PrivChat.Manifest.new() + "shard:identity:" <> pk -> + %SApp.Identity.Manifest{pk: parse_pk pk} + "shard:dir:pub:" <> <> <> ":" <> name -> + %SApp.Directory.Manifest{owner: parse_pk(pk), public: true, name: name} + "shard:dir:priv:" <> <> <> ":" <> name -> + %SApp.Directory.Manifest{owner: parse_pk(pk), public: false, name: name} + "shard:file:" <> <> -> + %SApp.File.Manifest{infohash: Base.decode16!(infohash)} + end + end + + def parse_pk(pkstr) do + 64 = byte_size pkstr + Base.decode16! pkstr + end +end -- cgit v1.2.3