aboutsummaryrefslogblamecommitdiff
path: root/src/rpc/layout/mod.rs
blob: facdb2ce5b8fedc313f2e031425ec19f9c23eaec (plain) (tree)
1
2
3
4
5
6
7
8
9
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478






                                        
               
           
            

            


            

                

                       
                                             
                           
























                                                                                                  















































































































































































































































































































































































































































                                                                                                              
use std::fmt;

use bytesize::ByteSize;

use garage_util::crdt::{AutoCrdt, Crdt};
use garage_util::data::Uuid;

mod graph_algo;
mod helper;
mod history;
mod version;

#[cfg(test)]
mod test;

pub mod manager;

// ---- re-exports ----

pub use helper::{LayoutDigest, LayoutHelper};
pub use manager::WriteLock;
pub use version::*;

// ---- defines: partitions ----

/// A partition id, which is stored on 16 bits
/// i.e. we have up to 2**16 partitions.
/// (in practice we have exactly 2**PARTITION_BITS partitions)
pub type Partition = u16;

// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
// Maximum value : 16
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8;

const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;

// ---- defines: nodes ----

// Type to store compactly the id of a node in the system
// Change this to u16 the day we want to have more than 256 nodes in a cluster
pub type CompactNodeType = u8;
pub const MAX_NODE_NUMBER: usize = 256;

// ======== actual data structures for the layout data ========
// ======== that is persisted to disk                  ========
// some small utility impls are at the end of this file,
// but most of the code that actually computes stuff is in
// version.rs, history.rs and helper.rs

mod v08 {
	use crate::layout::CompactNodeType;
	use garage_util::crdt::LwwMap;
	use garage_util::data::{Hash, Uuid};
	use serde::{Deserialize, Serialize};

	/// The layout of the cluster, i.e. the list of roles
	/// which are assigned to each cluster node
	#[derive(Clone, Debug, Serialize, Deserialize)]
	pub struct ClusterLayout {
		pub version: u64,

		pub replication_factor: usize,
		pub roles: LwwMap<Uuid, NodeRoleV>,

		// see comments in v010::ClusterLayout
		pub node_id_vec: Vec<Uuid>,
		#[serde(with = "serde_bytes")]
		pub ring_assignation_data: Vec<CompactNodeType>,

		/// Role changes which are staged for the next version of the layout
		pub staging: LwwMap<Uuid, NodeRoleV>,
		pub staging_hash: Hash,
	}

	#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
	pub struct NodeRoleV(pub Option<NodeRole>);

	/// The user-assigned roles of cluster nodes
	#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
	pub struct NodeRole {
		/// Datacenter at which this entry belong. This information is used to
		/// perform a better geodistribution
		pub zone: String,
		/// The capacity of the node
		/// If this is set to None, the node does not participate in storing data for the system
		/// and is only active as an API gateway to other nodes
		pub capacity: Option<u64>,
		/// A set of tags to recognize the node
		pub tags: Vec<String>,
	}

	impl garage_util::migrate::InitialFormat for ClusterLayout {}
}

mod v09 {
	use super::v08;
	use crate::layout::CompactNodeType;
	use garage_util::crdt::{Lww, LwwMap};
	use garage_util::data::{Hash, Uuid};
	use serde::{Deserialize, Serialize};
	pub use v08::{NodeRole, NodeRoleV};

	/// The layout of the cluster, i.e. the list of roles
	/// which are assigned to each cluster node
	#[derive(Clone, Debug, Serialize, Deserialize)]
	pub struct ClusterLayout {
		pub version: u64,

		pub replication_factor: usize,

		/// This attribute is only used to retain the previously computed partition size,
		/// to know to what extent does it change with the layout update.
		pub partition_size: u64,
		/// Parameters used to compute the assignment currently given by
		/// ring_assignment_data
		pub parameters: LayoutParameters,

		pub roles: LwwMap<Uuid, NodeRoleV>,

		// see comments in v010::ClusterLayout
		pub node_id_vec: Vec<Uuid>,
		#[serde(with = "serde_bytes")]
		pub ring_assignment_data: Vec<CompactNodeType>,

		/// Parameters to be used in the next partition assignment computation.
		pub staging_parameters: Lww<LayoutParameters>,
		/// Role changes which are staged for the next version of the layout
		pub staging_roles: LwwMap<Uuid, NodeRoleV>,
		pub staging_hash: Hash,
	}

	/// This struct is used to set the parameters to be used in the assignment computation
	/// algorithm. It is stored as a Crdt.
	#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
	pub struct LayoutParameters {
		pub zone_redundancy: ZoneRedundancy,
	}

	/// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
	/// of each partition on at least that number of different zones.
	/// Otherwise, copies will be stored on the maximum possible number of zones.
	#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
	pub enum ZoneRedundancy {
		AtLeast(usize),
		Maximum,
	}

	impl garage_util::migrate::Migrate for ClusterLayout {
		const VERSION_MARKER: &'static [u8] = b"G09layout";

		type Previous = v08::ClusterLayout;

		fn migrate(previous: Self::Previous) -> Self {
			use itertools::Itertools;

			// In the old layout, capacities are in an arbitrary unit,
			// but in the new layout they are in bytes.
			// Here we arbitrarily multiply everything by 1G,
			// such that 1 old capacity unit = 1GB in the new units.
			// This is totally arbitrary and won't work for most users.
			let cap_mul = 1024 * 1024 * 1024;
			let roles = multiply_all_capacities(previous.roles, cap_mul);
			let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
			let node_id_vec = previous.node_id_vec;

			// Determine partition size
			let mut tmp = previous.ring_assignation_data.clone();
			tmp.sort();
			let partition_size = tmp
				.into_iter()
				.dedup_with_count()
				.map(|(npart, node)| {
					roles
						.get(&node_id_vec[node as usize])
						.and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
						.unwrap_or(0) / npart as u64
				})
				.min()
				.unwrap_or(0);

			// By default, zone_redundancy is maximum possible value
			let parameters = LayoutParameters {
				zone_redundancy: ZoneRedundancy::Maximum,
			};

			Self {
				version: previous.version,
				replication_factor: previous.replication_factor,
				partition_size,
				parameters,
				roles,
				node_id_vec,
				ring_assignment_data: previous.ring_assignation_data,
				staging_parameters: Lww::new(parameters),
				staging_roles,
				staging_hash: [0u8; 32].into(), // will be set in the next migration
			}
		}
	}

	fn multiply_all_capacities(
		old_roles: LwwMap<Uuid, NodeRoleV>,
		mul: u64,
	) -> LwwMap<Uuid, NodeRoleV> {
		let mut new_roles = LwwMap::new();
		for (node, ts, role) in old_roles.items() {
			let mut role = role.clone();
			if let NodeRoleV(Some(NodeRole {
				capacity: Some(ref mut cap),
				..
			})) = role
			{
				*cap *= mul;
			}
			new_roles.merge_raw(node, *ts, &role);
		}
		new_roles
	}
}

mod v010 {
	use super::v09;
	use crate::layout::CompactNodeType;
	use garage_util::crdt::{Lww, LwwMap};
	use garage_util::data::Uuid;
	use serde::{Deserialize, Serialize};
	use std::collections::BTreeMap;
	pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};

	/// Number of old (non-live) versions to keep, see LayoutHistory::old_versions
	pub const OLD_VERSION_COUNT: usize = 5;

	/// The history of cluster layouts, with trackers to keep a record
	/// of which nodes are up-to-date to current cluster data
	#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
	pub struct LayoutHistory {
		/// The versions currently in use in the cluster
		pub versions: Vec<LayoutVersion>,
		/// At most 5 of the previous versions, not used by the garage_table
		/// module, but usefull for the garage_block module to find data blocks
		/// that have not yet been moved
		pub old_versions: Vec<LayoutVersion>,

		/// Update trackers
		pub update_trackers: UpdateTrackers,

		/// Staged changes for the next version
		pub staging: Lww<LayoutStaging>,
	}

	/// A version of the layout of the cluster, i.e. the list of roles
	/// which are assigned to each cluster node
	#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
	pub struct LayoutVersion {
		/// The number of this version
		pub version: u64,

		/// Roles assigned to nodes in this version
		pub roles: LwwMap<Uuid, NodeRoleV>,
		/// Parameters used to compute the assignment currently given by
		/// ring_assignment_data
		pub parameters: LayoutParameters,

		/// The number of replicas for each data partition
		pub replication_factor: usize,
		/// This attribute is only used to retain the previously computed partition size,
		/// to know to what extent does it change with the layout update.
		pub partition_size: u64,

		/// node_id_vec: a vector of node IDs with a role assigned
		/// in the system (this includes gateway nodes).
		/// The order here is different than the vec stored by `roles`, because:
		/// 1. non-gateway nodes are first so that they have lower numbers
		/// 2. nodes that don't have a role are excluded (but they need to
		///    stay in the CRDT as tombstones)
		pub node_id_vec: Vec<Uuid>,
		/// number of non-gateway nodes, which are the first ids in node_id_vec
		pub nongateway_node_count: usize,
		/// The assignation of data partitions to nodes, the values
		/// are indices in node_id_vec
		#[serde(with = "serde_bytes")]
		pub ring_assignment_data: Vec<CompactNodeType>,
	}

	/// The staged changes for the next layout version
	#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
	pub struct LayoutStaging {
		/// Parameters to be used in the next partition assignment computation.
		pub parameters: Lww<LayoutParameters>,
		/// Role changes which are staged for the next version of the layout
		pub roles: LwwMap<Uuid, NodeRoleV>,
	}

	/// The tracker of acknowlegments and data syncs around the cluster
	#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
	pub struct UpdateTrackers {
		/// The highest layout version number each node has ack'ed
		pub ack_map: UpdateTracker,
		/// The highest layout version number each node has synced data for
		pub sync_map: UpdateTracker,
		/// The highest layout version number each node has
		/// ack'ed that all other nodes have synced data for
		pub sync_ack_map: UpdateTracker,
	}

	/// Generic update tracker struct
	#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
	pub struct UpdateTracker(pub BTreeMap<Uuid, u64>);

	impl garage_util::migrate::Migrate for LayoutHistory {
		const VERSION_MARKER: &'static [u8] = b"G010lh";

		type Previous = v09::ClusterLayout;

		fn migrate(previous: Self::Previous) -> Self {
			let nongateway_node_count = previous
				.node_id_vec
				.iter()
				.enumerate()
				.filter(|(_, uuid)| {
					let role = previous.roles.get(uuid);
					matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some())
				})
				.map(|(i, _)| i + 1)
				.max()
				.unwrap_or(0);

			let version = LayoutVersion {
				version: previous.version,
				replication_factor: previous.replication_factor,
				partition_size: previous.partition_size,
				parameters: previous.parameters,
				roles: previous.roles,
				node_id_vec: previous.node_id_vec,
				nongateway_node_count,
				ring_assignment_data: previous.ring_assignment_data,
			};
			let update_tracker = UpdateTracker(
				version
					.nongateway_nodes()
					.iter()
					.copied()
					.map(|x| (x, version.version))
					.collect::<BTreeMap<Uuid, u64>>(),
			);
			let staging = LayoutStaging {
				parameters: previous.staging_parameters,
				roles: previous.staging_roles,
			};
			Self {
				versions: vec![version],
				old_versions: vec![],
				update_trackers: UpdateTrackers {
					ack_map: update_tracker.clone(),
					sync_map: update_tracker.clone(),
					sync_ack_map: update_tracker.clone(),
				},
				staging: Lww::raw(previous.version, staging),
			}
		}
	}
}

pub use v010::*;

// ---- utility functions ----

impl AutoCrdt for LayoutParameters {
	const WARN_IF_DIFFERENT: bool = true;
}

impl AutoCrdt for NodeRoleV {
	const WARN_IF_DIFFERENT: bool = true;
}

impl Crdt for LayoutStaging {
	fn merge(&mut self, other: &LayoutStaging) {
		self.parameters.merge(&other.parameters);
		self.roles.merge(&other.roles);
	}
}

impl NodeRole {
	pub fn capacity_string(&self) -> String {
		match self.capacity {
			Some(c) => ByteSize::b(c).to_string_as(false),
			None => "gateway".to_string(),
		}
	}

	pub fn tags_string(&self) -> String {
		self.tags.join(",")
	}
}

impl fmt::Display for ZoneRedundancy {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		match self {
			ZoneRedundancy::Maximum => write!(f, "maximum"),
			ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
		}
	}
}

impl core::str::FromStr for ZoneRedundancy {
	type Err = &'static str;
	fn from_str(s: &str) -> Result<Self, Self::Err> {
		match s {
			"none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
			x => {
				let v = x
					.parse::<usize>()
					.map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
				Ok(ZoneRedundancy::AtLeast(v))
			}
		}
	}
}

impl UpdateTracker {
	fn merge(&mut self, other: &UpdateTracker) -> bool {
		let mut changed = false;
		for (k, v) in other.0.iter() {
			if let Some(v_mut) = self.0.get_mut(k) {
				if *v > *v_mut {
					*v_mut = *v;
					changed = true;
				}
			} else {
				self.0.insert(*k, *v);
				changed = true;
			}
		}
		changed
	}

	/// This bumps the update tracker for a given node up to the specified value.
	/// This has potential impacts on the correctness of Garage and should only
	/// be used in very specific circumstances.
	pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
		match self.0.get_mut(&peer) {
			Some(e) if *e < value => {
				*e = value;
				true
			}
			None => {
				self.0.insert(peer, value);
				true
			}
			_ => false,
		}
	}

	pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
		storage_nodes
			.iter()
			.map(|x| self.get(x, min_version))
			.min()
			.unwrap_or(min_version)
	}

	pub fn get(&self, node: &Uuid, min_version: u64) -> u64 {
		self.0.get(node).copied().unwrap_or(min_version)
	}
}

impl UpdateTrackers {
	pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool {
		let c1 = self.ack_map.merge(&other.ack_map);
		let c2 = self.sync_map.merge(&other.sync_map);
		let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
		c1 || c2 || c3
	}
}