aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/schema.rs
blob: 969f5a0b17b8fbb4bf761928d4ea02af126b060c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
use std::fmt;

use bytesize::ByteSize;

use garage_util::crdt::{AutoCrdt, Crdt};
use garage_util::data::Uuid;

mod v08 {
	use crate::layout::CompactNodeType;
	use garage_util::crdt::LwwMap;
	use garage_util::data::{Hash, Uuid};
	use serde::{Deserialize, Serialize};

	/// The layout of the cluster, i.e. the list of roles
	/// which are assigned to each cluster node
	#[derive(Clone, Debug, Serialize, Deserialize)]
	pub struct ClusterLayout {
		pub version: u64,

		pub replication_factor: usize,
		pub roles: LwwMap<Uuid, NodeRoleV>,

		/// node_id_vec: a vector of node IDs with a role assigned
		/// in the system (this includes gateway nodes).
		/// The order here is different than the vec stored by `roles`, because:
		/// 1. non-gateway nodes are first so that they have lower numbers
		/// 2. nodes that don't have a role are excluded (but they need to
		///    stay in the CRDT as tombstones)
		pub node_id_vec: Vec<Uuid>,
		/// the assignation of data partitions to node, the values
		/// are indices in node_id_vec
		#[serde(with = "serde_bytes")]
		pub ring_assignation_data: Vec<CompactNodeType>,

		/// Role changes which are staged for the next version of the layout
		pub staging: LwwMap<Uuid, NodeRoleV>,
		pub staging_hash: Hash,
	}

	#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
	pub struct NodeRoleV(pub Option<NodeRole>);

	/// The user-assigned roles of cluster nodes
	#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
	pub struct NodeRole {
		/// Datacenter at which this entry belong. This information is used to
		/// perform a better geodistribution
		pub zone: String,
		/// The capacity of the node
		/// If this is set to None, the node does not participate in storing data for the system
		/// and is only active as an API gateway to other nodes
		pub capacity: Option<u64>,
		/// A set of tags to recognize the node
		pub tags: Vec<String>,
	}

	impl garage_util::migrate::InitialFormat for ClusterLayout {}
}

mod v09 {
	use super::v08;
	use crate::layout::CompactNodeType;
	use garage_util::crdt::{Lww, LwwMap};
	use garage_util::data::{Hash, Uuid};
	use serde::{Deserialize, Serialize};
	pub use v08::{NodeRole, NodeRoleV};

	/// The layout of the cluster, i.e. the list of roles
	/// which are assigned to each cluster node
	#[derive(Clone, Debug, Serialize, Deserialize)]
	pub struct ClusterLayout {
		pub version: u64,

		pub replication_factor: usize,

		/// This attribute is only used to retain the previously computed partition size,
		/// to know to what extent does it change with the layout update.
		pub partition_size: u64,
		/// Parameters used to compute the assignment currently given by
		/// ring_assignment_data
		pub parameters: LayoutParameters,

		pub roles: LwwMap<Uuid, NodeRoleV>,

		/// see comment in v08::ClusterLayout
		pub node_id_vec: Vec<Uuid>,
		/// see comment in v08::ClusterLayout
		#[serde(with = "serde_bytes")]
		pub ring_assignment_data: Vec<CompactNodeType>,

		/// Parameters to be used in the next partition assignment computation.
		pub staging_parameters: Lww<LayoutParameters>,
		/// Role changes which are staged for the next version of the layout
		pub staging_roles: LwwMap<Uuid, NodeRoleV>,
		pub staging_hash: Hash,
	}

	/// This struct is used to set the parameters to be used in the assignment computation
	/// algorithm. It is stored as a Crdt.
	#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
	pub struct LayoutParameters {
		pub zone_redundancy: ZoneRedundancy,
	}

	/// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
	/// of each partition on at least that number of different zones.
	/// Otherwise, copies will be stored on the maximum possible number of zones.
	#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
	pub enum ZoneRedundancy {
		AtLeast(usize),
		Maximum,
	}

	impl garage_util::migrate::Migrate for ClusterLayout {
		const VERSION_MARKER: &'static [u8] = b"G09layout";

		type Previous = v08::ClusterLayout;

		fn migrate(previous: Self::Previous) -> Self {
			use itertools::Itertools;

			// In the old layout, capacities are in an arbitrary unit,
			// but in the new layout they are in bytes.
			// Here we arbitrarily multiply everything by 1G,
			// such that 1 old capacity unit = 1GB in the new units.
			// This is totally arbitrary and won't work for most users.
			let cap_mul = 1024 * 1024 * 1024;
			let roles = multiply_all_capacities(previous.roles, cap_mul);
			let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
			let node_id_vec = previous.node_id_vec;

			// Determine partition size
			let mut tmp = previous.ring_assignation_data.clone();
			tmp.sort();
			let partition_size = tmp
				.into_iter()
				.dedup_with_count()
				.map(|(npart, node)| {
					roles
						.get(&node_id_vec[node as usize])
						.and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
						.unwrap_or(0) / npart as u64
				})
				.min()
				.unwrap_or(0);

			// By default, zone_redundancy is maximum possible value
			let parameters = LayoutParameters {
				zone_redundancy: ZoneRedundancy::Maximum,
			};

			Self {
				version: previous.version,
				replication_factor: previous.replication_factor,
				partition_size,
				parameters,
				roles,
				node_id_vec,
				ring_assignment_data: previous.ring_assignation_data,
				staging_parameters: Lww::new(parameters),
				staging_roles,
				staging_hash: [0u8; 32].into(), // will be set in the next migration
			}
		}
	}

	fn multiply_all_capacities(
		old_roles: LwwMap<Uuid, NodeRoleV>,
		mul: u64,
	) -> LwwMap<Uuid, NodeRoleV> {
		let mut new_roles = LwwMap::new();
		for (node, ts, role) in old_roles.items() {
			let mut role = role.clone();
			if let NodeRoleV(Some(NodeRole {
				capacity: Some(ref mut cap),
				..
			})) = role
			{
				*cap *= mul;
			}
			new_roles.merge_raw(node, *ts, &role);
		}
		new_roles
	}
}

mod v010 {
	use super::v09;
	use crate::layout::CompactNodeType;
	use garage_util::crdt::{Lww, LwwMap};
	use garage_util::data::{Hash, Uuid};
	use serde::{Deserialize, Serialize};
	use std::collections::BTreeMap;
	pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};

	/// The history of cluster layouts, with trackers to keep a record
	/// of which nodes are up-to-date to current cluster data
	#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
	pub struct LayoutHistory {
		/// The versions currently in use in the cluster
		pub versions: Vec<LayoutVersion>,

		/// Update trackers
		pub update_trackers: UpdateTrackers,
		/// Hash of the update trackers
		pub trackers_hash: Hash,

		/// Staged changes for the next version
		pub staging: Lww<LayoutStaging>,
		/// Hash of the serialized staging_parameters + staging_roles
		pub staging_hash: Hash,
	}

	/// A version of the layout of the cluster, i.e. the list of roles
	/// which are assigned to each cluster node
	#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
	pub struct LayoutVersion {
		pub version: u64,

		pub replication_factor: usize,

		/// This attribute is only used to retain the previously computed partition size,
		/// to know to what extent does it change with the layout update.
		pub partition_size: u64,
		/// Parameters used to compute the assignment currently given by
		/// ring_assignment_data
		pub parameters: LayoutParameters,

		pub roles: LwwMap<Uuid, NodeRoleV>,

		/// see comment in v08::ClusterLayout
		pub node_id_vec: Vec<Uuid>,
		/// number of non-gateway nodes, which are the first ids in node_id_vec
		pub nongateway_node_count: usize,
		/// see comment in v08::ClusterLayout
		#[serde(with = "serde_bytes")]
		pub ring_assignment_data: Vec<CompactNodeType>,
	}

	/// The staged changes for the next layout version
	#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
	pub struct LayoutStaging {
		/// Parameters to be used in the next partition assignment computation.
		pub parameters: Lww<LayoutParameters>,
		/// Role changes which are staged for the next version of the layout
		pub roles: LwwMap<Uuid, NodeRoleV>,
	}

	/// The tracker of acknowlegments and data syncs around the cluster
	#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
	pub struct UpdateTrackers {
		/// The highest layout version number each node has ack'ed
		pub ack_map: UpdateTracker,
		/// The highest layout version number each node has synced data for
		pub sync_map: UpdateTracker,
		/// The highest layout version number each node has
		/// ack'ed that all other nodes have synced data for
		pub sync_ack_map: UpdateTracker,
	}

	/// The history of cluster layouts
	#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
	pub struct UpdateTracker {
		pub values: BTreeMap<Uuid, u64>,
		pub current_min: u64,
	}

	impl garage_util::migrate::Migrate for LayoutHistory {
		const VERSION_MARKER: &'static [u8] = b"G010lh";

		type Previous = v09::ClusterLayout;

		fn migrate(previous: Self::Previous) -> Self {
			let nongateway_node_count = previous
				.node_id_vec
				.iter()
				.enumerate()
				.filter(|(_, uuid)| {
					let role = previous.roles.get(uuid);
					matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some())
				})
				.map(|(i, _)| i + 1)
				.max()
				.unwrap_or(0);

			let version = LayoutVersion {
				version: previous.version,
				replication_factor: previous.replication_factor,
				partition_size: previous.partition_size,
				parameters: previous.parameters,
				roles: previous.roles,
				node_id_vec: previous.node_id_vec,
				nongateway_node_count,
				ring_assignment_data: previous.ring_assignment_data,
			};
			let update_tracker = UpdateTracker {
				values: version
					.nongateway_nodes()
					.iter()
					.copied()
					.map(|x| (x, version.version))
					.collect::<BTreeMap<Uuid, u64>>(),
				current_min: 0,
			};
			let staging = LayoutStaging {
				parameters: previous.staging_parameters,
				roles: previous.staging_roles,
			};
			let mut ret = Self {
				versions: vec![version],
				update_trackers: UpdateTrackers {
					ack_map: update_tracker.clone(),
					sync_map: update_tracker.clone(),
					sync_ack_map: update_tracker.clone(),
				},
				trackers_hash: [0u8; 32].into(),
				staging: Lww::raw(previous.version, staging),
				staging_hash: [0u8; 32].into(),
			};
			ret.update_hashes();
			ret
		}
	}
}

pub use v010::*;

// ---- utility functions ----

impl AutoCrdt for LayoutParameters {
	const WARN_IF_DIFFERENT: bool = true;
}

impl AutoCrdt for NodeRoleV {
	const WARN_IF_DIFFERENT: bool = true;
}

impl Crdt for LayoutStaging {
	fn merge(&mut self, other: &LayoutStaging) {
		self.parameters.merge(&other.parameters);
		self.roles.merge(&other.roles);
	}
}

impl NodeRole {
	pub fn capacity_string(&self) -> String {
		match self.capacity {
			Some(c) => ByteSize::b(c).to_string_as(false),
			None => "gateway".to_string(),
		}
	}

	pub fn tags_string(&self) -> String {
		self.tags.join(",")
	}
}

impl fmt::Display for ZoneRedundancy {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		match self {
			ZoneRedundancy::Maximum => write!(f, "maximum"),
			ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
		}
	}
}

impl core::str::FromStr for ZoneRedundancy {
	type Err = &'static str;
	fn from_str(s: &str) -> Result<Self, Self::Err> {
		match s {
			"none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
			x => {
				let v = x
					.parse::<usize>()
					.map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
				Ok(ZoneRedundancy::AtLeast(v))
			}
		}
	}
}

impl UpdateTracker {
	fn merge(&mut self, other: &UpdateTracker) -> bool {
		let mut changed = false;
		for (k, v) in other.values.iter() {
			if let Some(v_mut) = self.values.get_mut(k) {
				if *v > *v_mut {
					*v_mut = *v;
					changed = true;
				}
			} else {
				self.values.insert(*k, *v);
				changed = true;
			}
		}
		changed
	}

	pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
		match self.values.get_mut(&peer) {
			Some(e) if *e < value => {
				*e = value;
				true
			}
			None => {
				self.values.insert(peer, value);
				true
			}
			_ => false,
		}
	}

	fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) {
		self.current_min = storage_nodes
			.iter()
			.map(|x| self.values.get(x).copied().unwrap_or(min_version))
			.min()
			.unwrap_or(min_version)
	}
}

impl UpdateTrackers {
	pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool {
		let c1 = self.ack_map.merge(&other.ack_map);
		let c2 = self.sync_map.merge(&other.sync_map);
		let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
		c1 || c2 || c3
	}

	pub(crate) fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) {
		self.ack_map.update_min(&storage_nodes, min_version);
		self.sync_map.update_min(&storage_nodes, min_version);
		self.sync_ack_map.update_min(&storage_nodes, min_version);
	}
}