diff options
Diffstat (limited to 'shard')
-rw-r--r-- | shard/lib/app/chat.ex | 5 | ||||
-rw-r--r-- | shard/lib/app/directory.ex | 168 | ||||
-rw-r--r-- | shard/lib/app/identity.ex | 8 | ||||
-rw-r--r-- | shard/lib/keys.ex | 8 | ||||
-rw-r--r-- | shard/lib/manager.ex | 174 |
5 files changed, 337 insertions, 26 deletions
diff --git a/shard/lib/app/chat.ex b/shard/lib/app/chat.ex index d253030..4c7fc00 100644 --- a/shard/lib/app/chat.ex +++ b/shard/lib/app/chat.ex @@ -141,6 +141,11 @@ defmodule SApp.Chat do end end + def handle_cast(:send_deps, state) do + GenServer.cast(Shard.Manager, {:dep_list, state.id, []}) + {:noreply, state} + end + def handle_cast(:mark_read, state) do state = %{state | read: state.mst.root} save_state(state) diff --git a/shard/lib/app/directory.ex b/shard/lib/app/directory.ex new file mode 100644 index 0000000..f7d871a --- /dev/null +++ b/shard/lib/app/directory.ex @@ -0,0 +1,168 @@ +defmodule SApp.Directory do + @moduledoc""" + Shard application for a directory of other shards. + + TODO: use MST for file list instead of plain list + """ + + use GenServer + + require Logger + + defmodule Manifest do + @moduledoc""" + Manifest for a directory. This directory is owned by a user, + has a name, and can be either public or private. + """ + + defstruct [:owner, :public, :name] + + defimpl Shard.Manifest do + def module(_m), do: SApp.Directory + end + end + + def start_link(manifest) do + GenServer.start_link(__MODULE__, manifest) + end + + def init(manifest) do + %Manifest{owner: owner, public: public, name: name} = manifest + id = SData.term_hash manifest + + Shard.Manager.dispatch_to(id, nil, self()) + files = case Shard.Manager.load_state(id) do + nil -> + SData.SignRev.new %{} + st -> st + end + netgroup = case public do + true -> %SNet.PubShardGroup{id: id} + false -> %SNet.PrivGroup{pk_list: [owner]} + end + SNet.Group.init_lookup(netgroup, self()) + + {:ok, %{ + owner: owner, public: public, name: name, + manifest: manifest, id: id, netgroup: netgroup, + files: files}} + end + + def handle_call(:manifest, _from, state) do + {:reply, state.manifest, state} + end + + def handle_call(:get_files, _from, state) do + {:reply, SData.SignRev.get(state.files), state} + end + + def handle_call({:add_file, name, manifest}, _from, state) do + if Shard.Keys.have_sk?(state.owner) do + dict = SData.SignRev.get(state.files) + if dict[name] != nil and dict[name] != manifest do + {:reply, :exists_already, state} + else + dict = Map.put(dict, name, manifest) + GenServer.cast(Shard.Manager, {:dep_list, state.id, Map.values(dict)}) + {:ok, st2} = SData.SignRev.set(state.files, dict, state.owner) + Shard.Manager.save_state(state.id, st2) + state = put_in(state.files, st2) + bcast_state(state) + {:reply, :ok, state} + end + else + {:reply, :impossible, state} + end + end + + def handle_call({:rm_file, name}, _from, state) do + if Shard.Keys.have_sk?(state.owner) do + dict = SData.SignRev.get(state.files) + if dict[name] == nil do + {:reply, :not_found, state} + else + dict = Map.delete(dict, name) + GenServer.cast(Shard.Manager, {:dep_list, state.id, Map.values(dict)}) + {:ok, st2} = SData.SignRev.set(state.files, dict, state.owner) + Shard.Manager.save_state(state.id, st2) + state = put_in(state.files, st2) + bcast_state(state) + {:reply, :ok, state} + end + else + {:reply, :impossible, state} + end + end + + def handle_cast(:send_deps, state) do + dict = SData.SignRev.get(state.files) + GenServer.cast(Shard.Manager, {:dep_list, state.id, Map.values(dict)}) + {:noreply, state} + end + + def handle_cast({:interested, peer_pid, auth}, state) do + if SNet.Group.in_group?(state.netgroup, peer_pid, auth) do + SNet.Manager.send_pid(peer_pid, {state.id, nil, {:update, SData.SignRev.signed(state.files), true}}) + end + {:noreply, state} + end + + def handle_cast({:msg, conn_pid, auth, _shard_id, nil, msg}, state) do + if not SNet.Group.in_group?(state.netgroup, conn_pid, auth) do + {:noreply, state} + else + state = case msg do + {:update, signed, ask_reply} when signed != nil -> + state = case SData.SignRev.merge(state.files, signed, state.pk) do + {true, newfiles} -> + Shard.Manager.save_state(state.id, newfiles) + state = put_in(state.files, newfiles) + bcast_state(state, [conn_pid]) + state + {false, _} -> + state + end + if ask_reply do + SNet.Manager.send_pid(conn_pid, {state.id, nil, {:update, SData.SignRev.signed(state.files), false}}) + end + state + _ -> state + end + {:noreply, state} + end + end + + defp bcast_state(state, exclude \\ []) do + msg = {state.id, nil, {:update, SData.SignRev.signed(state.files), false}} + SNet.Group.broadcast(state.netgroup, msg, exclude_pid: exclude) + end + + # ================ + # PUBLIC INTERFACE + # ================ + + @doc""" + Return list of files stored in this directory. + + Returns a list of {name, manifests}. + """ + def get_files(pid) do + GenServer.call(pid, :get_files) + end + + @doc""" + Add a file to this directory. A file is a name for a shard manifest. + A file added to a directory becomes a dependency of the directory, i.e. + if the directory is pinned then all files inside are pinned as well. + """ + def add_file(pid, name, manifest) do + GenServer.call(pid, {:add_file, name, manifest}) + end + + @doc""" + Remove a named file from this directory. + """ + def rm_file(pid, name) do + GenServer.call(pid, {:rm_file, name}) + end +end diff --git a/shard/lib/app/identity.ex b/shard/lib/app/identity.ex index bacfdab..b034107 100644 --- a/shard/lib/app/identity.ex +++ b/shard/lib/app/identity.ex @@ -78,6 +78,14 @@ defmodule SApp.Identity do end end + def handle_cast(:send_deps, state) do + default_deps = [ + %SApp.Directory.Manifest{owner: state.pk, public: false, name: "friends"} + ] + GenServer.cast(Shard.Manager, {:dep_list, state.id, default_deps}) + {:noreply, state} + end + def handle_cast({:peer_connected, peer_pid}, state) do GenServer.cast(peer_pid, {:send_msg, {:interested, [state.id]}}) {:noreply, state} diff --git a/shard/lib/keys.ex b/shard/lib/keys.ex index e0180b7..412baa2 100644 --- a/shard/lib/keys.ex +++ b/shard/lib/keys.ex @@ -16,7 +16,9 @@ defmodule Shard.Keys do :dets.start {:ok, @key_db} = :dets.open_file(@key_db, [type: :set]) for [pk, _] <- :dets.match(@key_db, {:'$1', :'$2'}) do - Shard.Manager.find_or_start %SApp.Identity.Manifest{pk: pk} + m = %SApp.Identity.Manifest{pk: pk} + Shard.Manager.find_or_start m + GenServer.cast(Shard.Manager, {:pin, SData.term_hash(m)}) end nil end @@ -53,7 +55,9 @@ defmodule Shard.Keys do {pk, sk} = gen_keypair(Application.get_env(:shard, :identity_suffix)) Logger.info "New identity: #{pk|>Base.encode16}" :dets.insert @key_db, {pk, sk} - Shard.Manager.find_or_start %SApp.Identity.Manifest{pk: pk} + m = %SApp.Identity.Manifest{pk: pk} + Shard.Manager.find_or_start m + GenServer.cast(Shard.Manager, {:pin, SData.term_hash(m)}) pk end diff --git a/shard/lib/manager.ex b/shard/lib/manager.ex index 3a0e21c..990bcea 100644 --- a/shard/lib/manager.ex +++ b/shard/lib/manager.ex @@ -26,7 +26,11 @@ defmodule Shard.Manager do - :shard_db (persistent with DETS) List of - { id, manifest, state } + { id, manifest, why_have_it, state } + + why_have_it := {:pinned, %MapSet{who requires it...}, %MapSet{who it requires...}} + | {:req, %MapSet{who requires it...}, %MapSet{who it requires...}} + | {:cached, expiry_date} - :peer_db (persistent with DETS) @@ -46,6 +50,9 @@ defmodule Shard.Manager do require Logger + @cache_ttl 3600*24 # 24 hours + @clean_cache_every 60 # one minute + @shard_db [Application.get_env(:shard, :data_path), "shard_db"] |> Path.join |> String.to_atom @peer_db [Application.get_env(:shard, :data_path), "peer_db"] |> Path.join |> String.to_atom @@ -61,24 +68,13 @@ defmodule Shard.Manager do :ets.new(:shard_procs, [:set, :protected, :named_table]) + Process.send_after(self(), :clean_cache, 1000) {:ok, %{}} end def handle_call({:find_or_start, shard_id, manifest}, _from, state) do - case :dets.lookup(@shard_db, shard_id) do - [] -> :dets.insert(@shard_db, {shard_id, manifest, nil}) - _ -> nil - end - - case :ets.lookup(:shard_procs, {shard_id, nil}) do - [] -> - {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest]) - :ets.insert(:shard_procs, {{shard_id, nil}, pid}) - state = Map.put(state, pid, {shard_id, nil}) - {:reply, pid, state} - pid -> - {:reply, pid, state} - end + {pid, state} = find_or_start(state, shard_id, manifest) + {:reply, pid, state} end def handle_cast({:dispatch_to, shard_id, path, pid}, state) do @@ -100,6 +96,84 @@ defmodule Shard.Manager do {:noreply, state} end + def handle_cast({:save_state, shard_id, shst}, state) do + case :dets.lookup(@shard_db, shard_id) do + [{^shard_id, manifest, why_have_it, _old_state}] -> + :dets.insert(@shard_db, {shard_id, manifest, why_have_it, shst}) + end + {:noreply, state} + end + + def handle_cast({:pin, shard_id}, state) do + case :dets.lookup(@shard_db, shard_id) do + [{^shard_id, manifest, {:cached, _}, shst}] -> + :dets.insert(@shard_db, {shard_id, manifest, {:pinned, %MapSet{}, %MapSet{}}, shst}) + {pid, state} = find_or_start(state, shard_id, manifest) + GenServer.cast(pid, :send_deps) + {:noreply, state} + [{^shard_id, manifest, {:req, a, b}, shst}] -> + :dets.insert(@shard_db, {shard_id, manifest, {:pinned, a, b}, shst}) + {:noreply, state} + _ -> + {:noreply, state} + end + end + + def handle_cast({:unpin, shard_id}, state) do + case :dets.lookup(@shard_db, shard_id) do + [{^shard_id, manifest, {:pinned, a, b}, shst}] -> + if MapSet.size(a) > 0 do + :dets.insert(@shard_db, {shard_id, manifest, {:req, a, b}, shst}) + else + for dep <- b do + rm_dep_link(shard_id, dep) + end + :dets.insert(@shard_db, {shard_id, manifest, cached(), shst}) + end + _ -> nil + end + {:noreply, state} + end + + def handle_cast({:dep_list, shard_id, manifests}, state) do + case :dets.lookup(@shard_db, shard_id) do + [{^shard_id, manifest, {reason, a, b}, shst}] when reason == :pinned or reason == :req -> + bnew_pairs = Enum.map(manifests, fn m -> {SData.term_hash(m), m} end) + bnew_map = Enum.reduce(bnew_pairs, %{}, fn {id, m}, map -> Map.put(map, id, m) end) + bnew_set = Enum.reduce(bnew_pairs, %MapSet{}, fn {id, _m}, ms -> MapSet.put(ms, id) end) + state = MapSet.difference(bnew_set, b) + |> Enum.reduce(state, fn idadd, state -> + add_dep_link(state, shard_id, idadd, bnew_map[idadd]) + end) + for idrm <- MapSet.difference(b, bnew_set) do + rm_dep_link(shard_id, idrm) + end + :dets.insert(@shard_db, {shard_id, manifest, {reason, a, bnew_set}, shst}) + {:noreply, state} + _ -> + {:noreply, state} + end + end + + def handle_info(:clean_cache, state) do + currtime = System.os_time :seconds + + shards = :dets.select(@shard_db, [{ + {:'$1', :_, {:cached, :'$2'}, :_}, [{:<, :'$2', currtime}], [:'$1']} + ]) + for [id] <- shards do + case :ets.lookup(:shard_procs, {id, nil}) do + [{{^id, nil}, pid}] -> + GenServer.cast(pid, :delete_shard) + _ -> nil + end + :dets.delete(@shard_db, id) + end + + Process.send_after(self(), :clean_cache, @clean_cache_every * 1000) + {:noreply, state} + end + def handle_info({:DOWN, _, :process, pid, reason}, state) do handle_info({:EXIT, pid, reason}, state) end @@ -114,6 +188,59 @@ defmodule Shard.Manager do end end + defp find_or_start(state, shard_id, manifest) do + case :dets.lookup(@shard_db, shard_id) do + [] -> + :dets.insert(@shard_db, {shard_id, manifest, cached(), nil}) + [{^shard_id, ^manifest, {:cached, _}, shst}] -> + :dets.insert(@shard_db, {shard_id, manifest, cached(), shst}) + _ -> nil + end + + case :ets.lookup(:shard_procs, {shard_id, nil}) do + [] -> + {:ok, pid} = apply(Shard.Manifest.module(manifest), :start_link, [manifest]) + :ets.insert(:shard_procs, {{shard_id, nil}, pid}) + state = Map.put(state, pid, {shard_id, nil}) + {pid, state} + [{{^shard_id, nil}, pid}] -> + {pid, state} + end + end + + defp cached() do + {:cached, System.os_time(:seconds) + @cache_ttl} + end + + defp add_dep_link(state, shard_id, id2, m2) do + case :dets.lookup(@shard_db, id2) do + [{^id2, ^m2, {reason, a, b}, shst}] when reason == :pinned or reason == :req -> + :dets.insert(@shard_db, {id2, m2, {reason, MapSet.put(a, shard_id), b}, shst}) + state + _ -> + a = MapSet.new() |> MapSet.put(shard_id) + :dets.insert(@shard_db, {id2, m2, {:req, a, %MapSet{}}, nil}) + {pid, state} = find_or_start(state, id2, m2) + GenServer.cast(pid, :send_deps) + state + end + end + + defp rm_dep_link(shard_id, id2) do + case :dets.lookup(@shard_db, id2) do + [{^id2, m2, {reason, a, b}, shst}] when reason == :pinned or reason == :req -> + a2 = MapSet.delete(a, shard_id) + if reason == :req and MapSet.size(a2) == 0 do + :dets.insert(@shard_db, {id2, m2, cached(), shst}) + for dep <- b do + rm_dep_link(id2, dep) + end + else + :dets.insert(@shard_db, {id2, m2, {reason, a2, b}, shst}) + end + end + end + # ====================== # CALLED BY SNet.TcpConn @@ -125,7 +252,7 @@ defmodule Shard.Manager do def incoming(conn_pid, peer_info, auth, {:interested, shards}) do for shard_id <- shards do case :dets.lookup(@shard_db, shard_id) do - [{ ^shard_id, manifest, _ }] -> + [{ ^shard_id, manifest, _, _ }] -> GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info}) pid = case :ets.lookup(:shard_procs, {shard_id, nil}) do [] -> @@ -146,7 +273,7 @@ defmodule Shard.Manager do case :dets.lookup(@shard_db, shard_id) do [] -> GenServer.cast(conn_pid, {:send_msg, {:not_interested, shard_id}}) - [{ ^shard_id, manifest, _}] -> + [{ ^shard_id, manifest, _, _}] -> GenServer.cast(__MODULE__, {:peer_db_insert, shard_id, peer_info}) pid = case :ets.lookup(:shard_procs, {shard_id, path}) do [] -> @@ -190,7 +317,7 @@ defmodule Shard.Manager do """ def load_state(shard_id) do case :dets.lookup(@shard_db, shard_id) do - [{^shard_id, _, state}] -> state + [{^shard_id, _, _, state}] -> state _ -> nil end end @@ -199,10 +326,7 @@ defmodule Shard.Manager do Save a state value for a shard """ def save_state(shard_id, state) do - case :dets.lookup(@shard_db, shard_id) do - [{^shard_id, manifest, _old_state}] -> - :dets.insert(@shard_db, {shard_id, manifest, state}) - end + GenServer.cast(__MODULE__, {:save_state, shard_id, state}) end @@ -234,9 +358,11 @@ defmodule Shard.Manager do end @doc""" - Return the list of all shards. + Return the list of all shards. Returns a list of tuples: + + {id, manifest, why_have_it} """ def list_shards() do - for [x] <- :dets.match(@shard_db, :"$1"), do: x + for [{id, m, why, _}] <- :dets.match(@shard_db, :"$1"), do: {id, m, why} end end |