aboutsummaryrefslogtreecommitdiff
path: root/src/table/gc.rs
blob: e88433399685e13f909b0f4f69e189a90cac80f8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;

use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use tokio::sync::watch;

use garage_db as db;

use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;

use garage_rpc::system::System;
use garage_rpc::*;

use crate::data::*;
use crate::replication::*;
use crate::schema::*;

const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);

// GC delay for table entries: 1 day (24 hours)
// (the delay before the entry is added in the GC todo list
// and the moment the garbage collection actually happens)
const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600);

pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
	system: Arc<System>,
	data: Arc<TableData<F, R>>,

	endpoint: Arc<Endpoint<GcRpc, Self>>,
}

#[derive(Serialize, Deserialize)]
enum GcRpc {
	Update(Vec<ByteBuf>),
	DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
	Ok,
}

impl Rpc for GcRpc {
	type Response = Result<GcRpc, Error>;
}

impl<F, R> TableGc<F, R>
where
	F: TableSchema + 'static,
	R: TableReplication + 'static,
{
	pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
		let endpoint = system
			.netapp
			.endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME));

		let gc = Arc::new(Self {
			system: system.clone(),
			data,
			endpoint,
		});

		gc.endpoint.set_handler(gc.clone());

		let gc1 = gc.clone();
		system.background.spawn_worker(
			format!("GC loop for {}", F::TABLE_NAME),
			move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
		);

		gc
	}

	async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
		while !*must_exit.borrow() {
			match self.gc_loop_iter().await {
				Ok(None) => {
					// Stuff was done, loop immediately
				}
				Ok(Some(wait_delay)) => {
					// Nothing was done, wait specified delay.
					select! {
						_ = tokio::time::sleep(wait_delay).fuse() => {},
						_ = must_exit.changed().fuse() => {},
					}
				}
				Err(e) => {
					warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
				}
			}
		}
	}

	async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
		let now = now_msec();

		// List entries in the GC todo list
		// These entries are put there when a tombstone is inserted in the table
		// (see update_entry in data.rs)
		let mut candidates = vec![];
		for entry_kv in self.data.gc_todo.iter()? {
			let (k, vhash) = entry_kv?;
			let todo_entry = GcTodoEntry::parse(&k, &vhash);

			if todo_entry.deletion_time() > now {
				if candidates.is_empty() {
					// If the earliest entry in the todo list shouldn't yet be processed,
					// return a duration to wait in the loop
					return Ok(Some(Duration::from_millis(
						todo_entry.deletion_time() - now,
					)));
				} else {
					// Otherwise we have some entries to process, do a normal iteration.
					break;
				}
			}

			candidates.push(todo_entry);
			if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE {
				break;
			}
		}

		let mut entries = vec![];
		let mut excluded = vec![];
		for mut todo_entry in candidates {
			// Check if the tombstone is still the current value of the entry.
			// If not, we don't actually want to GC it, and we will remove it
			// from the gc_todo table later (below).
			let vhash = todo_entry.value_hash;
			todo_entry.value = self
				.data
				.store
				.get(&todo_entry.key[..])?
				.filter(|v| blake2sum(&v[..]) == vhash)
				.map(|v| v.to_vec());

			if todo_entry.value.is_some() {
				entries.push(todo_entry);
				if entries.len() >= TABLE_GC_BATCH_SIZE {
					break;
				}
			} else {
				excluded.push(todo_entry);
			}
		}

		// Remove from gc_todo entries for tombstones where we have
		// detected that the current value has changed and
		// is no longer a tombstone.
		for entry in excluded {
			entry.remove_if_equal(&self.data.gc_todo)?;
		}

		// Remaining in `entries` is the list of entries we want to GC,
		// and for which they are still currently tombstones in the table.

		if entries.is_empty() {
			// Nothing to do in this iteration (no entries present)
			// Wait for a default delay of 60 seconds
			return Ok(Some(Duration::from_secs(60)));
		}

		debug!("({}) GC: doing {} items", F::TABLE_NAME, entries.len());

		// Split entries to GC by the set of nodes on which they are stored.
		// Here we call them partitions but they are not exactly
		// the same as partitions as defined in the ring: those partitions
		// are defined by the first 8 bits of the hash, but two of these
		// partitions can be stored on the same set of nodes.
		// Here we detect when entries are stored on the same set of nodes:
		// even if they are not in the same 8-bit partition, we can still
		// handle them together.
		let mut partitions = HashMap::new();
		for entry in entries {
			let pkh = Hash::try_from(&entry.key[..32]).unwrap();
			let mut nodes = self.data.replication.write_nodes(&pkh);
			nodes.retain(|x| *x != self.system.id);
			nodes.sort();

			if !partitions.contains_key(&nodes) {
				partitions.insert(nodes.clone(), vec![]);
			}
			partitions.get_mut(&nodes).unwrap().push(entry);
		}

		// For each set of nodes that contains some items,
		// ensure they are aware of the tombstone status, and once they
		// are, instruct them to delete the entries.
		let resps = join_all(
			partitions
				.into_iter()
				.map(|(nodes, items)| self.try_send_and_delete(nodes, items)),
		)
		.await;

		// Collect errors and return a single error value even if several
		// errors occurred.
		let mut errs = vec![];
		for resp in resps {
			if let Err(e) = resp {
				errs.push(e);
			}
		}

		if errs.is_empty() {
			Ok(None)
		} else {
			Err(Error::Message(
				errs.into_iter()
					.map(|x| format!("{}", x))
					.collect::<Vec<_>>()
					.join(", "),
			))
			.err_context("in try_send_and_delete in table GC:")
		}
	}

	async fn try_send_and_delete(
		&self,
		nodes: Vec<Uuid>,
		mut items: Vec<GcTodoEntry>,
	) -> Result<(), Error> {
		let n_items = items.len();

		// Strategy: we first send all of the values to the remote nodes,
		// to ensure that they are aware of the tombstone state,
		// and that the previous state was correctly overwritten
		// (if they have a newer state that overrides the tombstone, that's fine).
		// Second, once everyone is at least at the tombstone state,
		// we instruct everyone to delete the tombstone IF that is still their current state.
		// If they are now at a different state, it means that that state overrides the
		// tombstone in the CRDT lattice, and it will be propagated back to us at some point
		// (either just a regular update that hasn't reached us yet, or later when the
		// table is synced).

		// Here, we store in updates all of the tombstones to send for step 1,
		// and in deletes the list of keys and hashes of value for step 2.
		let mut updates = vec![];
		let mut deletes = vec![];
		for item in items.iter_mut() {
			updates.push(ByteBuf::from(item.value.take().unwrap()));
			deletes.push((ByteBuf::from(item.key.clone()), item.value_hash));
		}

		// Step 1: ensure everyone is at least at tombstone in CRDT lattice
		// Here the quorum is nodes.len(): we cannot tolerate even a single failure,
		// otherwise old values before the tombstone might come back in the data.
		// GC'ing is not a critical function of the system, so it's not a big
		// deal if we can't do it right now.
		self.system
			.rpc
			.try_call_many(
				&self.endpoint,
				&nodes[..],
				GcRpc::Update(updates),
				RequestStrategy::with_priority(PRIO_BACKGROUND)
					.with_quorum(nodes.len())
					.with_timeout(TABLE_GC_RPC_TIMEOUT),
			)
			.await
			.err_context("GC: send tombstones")?;

		info!(
			"({}) GC: {} items successfully pushed, will try to delete.",
			F::TABLE_NAME,
			n_items
		);

		// Step 2: delete tombstones everywhere.
		// Here we also fail if even a single node returns a failure:
		// it means that the garbage collection wasn't completed and has
		// to be retried later.
		self.system
			.rpc
			.try_call_many(
				&self.endpoint,
				&nodes[..],
				GcRpc::DeleteIfEqualHash(deletes),
				RequestStrategy::with_priority(PRIO_BACKGROUND)
					.with_quorum(nodes.len())
					.with_timeout(TABLE_GC_RPC_TIMEOUT),
			)
			.await
			.err_context("GC: remote delete tombstones")?;

		// GC has been successfull for all of these entries.
		// We now remove them all from our local table and from the GC todo list.
		for item in items {
			self.data
				.delete_if_equal_hash(&item.key[..], item.value_hash)
				.err_context("GC: local delete tombstones")?;
			item.remove_if_equal(&self.data.gc_todo)
				.err_context("GC: remove from todo list after successfull GC")?;
		}

		Ok(())
	}
}

#[async_trait]
impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
where
	F: TableSchema + 'static,
	R: TableReplication + 'static,
{
	async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
		match message {
			GcRpc::Update(items) => {
				self.data.update_many(items)?;
				Ok(GcRpc::Ok)
			}
			GcRpc::DeleteIfEqualHash(items) => {
				for (key, vhash) in items.iter() {
					self.data.delete_if_equal_hash(&key[..], *vhash)?;
				}
				Ok(GcRpc::Ok)
			}
			m => Err(Error::unexpected_rpc_message(m)),
		}
	}
}

/// An entry stored in the gc_todo Sled tree associated with the table
/// Contains helper function for parsing, saving, and removing
/// such entry in Sled
///
/// Format of an entry:
/// - key =    8 bytes: timestamp of tombstone
///                     (used to implement GC delay)
///            n bytes: key in the main data table
/// - value =  hash of the table entry to delete (the tombstone)
///            for verification purpose, because we don't want to delete
///            things that aren't tombstones
pub(crate) struct GcTodoEntry {
	tombstone_timestamp: u64,
	key: Vec<u8>,
	value_hash: Hash,
	value: Option<Vec<u8>>,
}

impl GcTodoEntry {
	/// Creates a new GcTodoEntry (not saved in Sled) from its components:
	/// the key of an entry in the table, and the hash of the associated
	/// serialized value
	pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self {
		Self {
			tombstone_timestamp: now_msec(),
			key,
			value_hash,
			value: None,
		}
	}

	/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
	pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self {
		Self {
			tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()),
			key: db_k[8..].to_vec(),
			value_hash: Hash::try_from(db_v).unwrap(),
			value: None,
		}
	}

	/// Saves the GcTodoEntry in the gc_todo tree
	pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
		gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
		Ok(())
	}

	/// Removes the GcTodoEntry from the gc_todo tree if the
	/// hash of the serialized value is the same here as in the tree.
	/// This is usefull to remove a todo entry only under the condition
	/// that it has not changed since the time it was read, i.e.
	/// what we have to do is still the same
	pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
		let key = self.todo_table_key();
		gc_todo_tree.db().transaction(|mut tx| {
			let remove =
				matches!(tx.get(gc_todo_tree, &key)?, Some(ov) if ov == self.value_hash.as_slice());
			if remove {
				tx.remove(gc_todo_tree, &key)?;
			}
			tx.commit(())
		})?;
		Ok(())
	}

	fn todo_table_key(&self) -> Vec<u8> {
		[
			&u64::to_be_bytes(self.tombstone_timestamp)[..],
			&self.key[..],
		]
		.concat()
	}

	fn deletion_time(&self) -> u64 {
		self.tombstone_timestamp + TABLE_GC_DELAY.as_millis() as u64
	}
}