1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
defmodule SApp.BlockStore do
@moduledoc """
A module that implements a content-adressable storage (blocks, or pages,
identified by the hash of their contents).
This is not a shard, it is a side process that a shard may use to store its data.
TODO: WIP
"""
use GenServer
@enforce_keys [:pid]
defstruct [:pid, :prefer_ask]
defmodule State do
defstruct [:shard_id, :path, :store, :reqs, :retries]
end
def start_link(shard_id, path) do
GenServer.start_link(__MODULE__, [shard_id, path])
end
def init([shard_id, path]) do
Shard.Manager.dispatch_to(shard_id, path, self())
{:ok, %State{shard_id: shard_id, path: path, store: %{}, reqs: %{}, retries: %{}}}
end
def handle_call({:get, key, prefer_ask}, from, state) do
case state.store[key] do
nil ->
case prefer_ask do
[_ | _] ->
for peer <- prefer_ask do
Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
end
_ ->
ask_random_peers(state, key)
end
reqs_key = case state.reqs[key] do
nil ->
MapSet.put(MapSet.new(), from)
ms ->
MapSet.put(ms, from)
end
state = put_in(state.reqs[key], reqs_key)
{:noreply, state}
v ->
{:reply, v, state}
end
end
def handle_call({:put, val}, _from, state) do
hash = SData.term_hash val
state = %{state | store: Map.put(state.store, hash, val)}
{:reply, hash, state}
end
def handle_cast({:msg, peer_id, _shard_id, _path, msg}, state) do
state = case msg do
{:get, key} ->
case state.store[key] do
nil ->
Shard.Manager.send(peer_id, {state.shard_id, state.path, {:not_found, key}})
v ->
Shard.Manager.send(peer_id, {state.shard_id, state.path, {:info, key, v}})
end
state
{:info, hash, value} ->
if SData.term_hash value == hash do
reqs = case state.reqs[hash] do
nil -> state.reqs
pids ->
for pid <- pids do
GenServer.reply(pid, value)
end
Map.delete(state.reqs, hash)
end
state = %{state | retries: Map.delete(state.retries, hash)}
%{state | store: Map.put(state.store, hash, value), reqs: reqs}
else
state
end
{:not_found, key} ->
if state.reqs[key] != nil and state.store[key] == nil do
nretry = case state.retries[key] do
nil -> 1
n -> n+1
end
if nretry < 3 do
ask_random_peers(state, key)
%{state | retries: Map.put(state.retries, key, nretry)}
else
for pid <- state.reqs[key] do
GenServer.reply(pid, nil)
end
state = %{state | reqs: Map.delete(state.reqs, key)}
state = %{state | retries: Map.delete(state.retries, key)}
state
end
else
state
end
end
{:noreply, state}
end
def ask_random_peers(state, key) do
peers = :ets.lookup(:shard_peer_db, state.shard_id)
|> Enum.shuffle
|> Enum.take(3)
for peer <- peers do
Shard.Manager.send(peer, {state.shard_id, state.path, {:get, key}})
end
end
defimpl SData.PageStore do
def put(store, page) do
hash = GenServer.call(store.pid, {:put, page})
{ hash, store }
end
def get(store, hash) do
try do
GenServer.call(store.pid, {:get, hash, store.prefer_ask})
catch
:exit, {:timeout, _} -> nil
end
end
def copy(store, other_store, hash) do
page = SData.PageStore.get(other_store, hash)
refs = SData.Page.refs(page)
for ref <- refs do
copy(store, other_store, ref)
end
GenServer.call(store.pid, {:put, page})
store
end
def free(store, _hash) do
store ## DO SOMETHING???
end
end
end
|