aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/ring.rs
blob: 7cbab76235f7f39141f8cd55b4c5c5bf17aed089 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
//! Module containing types related to computing nodes which should receive a copy of data blocks
//! and metadata
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;

use netapp::NodeID;

use serde::{Deserialize, Serialize};

use garage_util::data::*;

/// A partition id, which is stored on 16 bits
/// i.e. we have up to 2**16 partitions.
/// (in practice we have exactly 2**PARTITION_BITS partitions)
pub type Partition = u16;

// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
// Maximum value : 16
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8;

const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);

/// The user-defined configuration of the cluster's nodes
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
	/// Map of each node's id to it's configuration
	pub members: HashMap<Uuid, NetworkConfigEntry>,
	/// Version of this config
	pub version: u64,
}

impl NetworkConfig {
	pub(crate) fn new() -> Self {
		Self {
			members: HashMap::new(),
			version: 0,
		}
	}

	pub(crate) fn migrate_from_021(old: garage_rpc_021::ring::NetworkConfig) -> Self {
		let members = old
			.members
			.into_iter()
			.map(|(id, conf)| {
				(
					Hash::try_from(id.as_slice()).unwrap(),
					NetworkConfigEntry {
						zone: conf.datacenter,
						capacity: if conf.capacity == 0 {
							None
						} else {
							Some(conf.capacity)
						},
						tag: conf.tag,
					},
				)
			})
			.collect();
		Self {
			members,
			version: old.version,
		}
	}
}

/// The overall configuration of one (possibly remote) node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfigEntry {
	/// Datacenter at which this entry belong. This infromation might be used to perform a better
	/// geodistribution
	pub zone: String,
	/// The (relative) capacity of the node
	/// If this is set to None, the node does not participate in storing data for the system
	/// and is only active as an API gateway to other nodes
	pub capacity: Option<u32>,
	/// A tag to recognize the entry, not used for other things than display
	pub tag: String,
}

impl NetworkConfigEntry {
	pub fn capacity_string(&self) -> String {
		match self.capacity {
			Some(c) => format!("{}", c),
			None => "gateway".to_string(),
		}
	}
}

/// A ring distributing fairly objects to nodes
#[derive(Clone)]
pub struct Ring {
	/// The replication factor for this ring
	pub replication_factor: usize,

	/// The network configuration used to generate this ring
	pub config: NetworkConfig,

	// Internal order of nodes used to make a more compact representation of the ring
	nodes: Vec<NodeID>,

	// The list of entries in the ring
	ring: Vec<RingEntry>,
}

// Type to store compactly the id of a node in the system
// Change this to u16 the day we want to have more than 256 nodes in a cluster
type CompactNodeType = u8;

// The maximum number of times an object might get replicated
// This must be at least 3 because Garage supports 3-way replication
// Here we use 6 so that the size of a ring entry is 8 bytes
// (2 bytes partition id, 6 bytes node numbers as u8s)
const MAX_REPLICATION: usize = 6;

/// An entry in the ring
#[derive(Clone, Debug)]
struct RingEntry {
	// The two first bytes of the first hash that goes in this partition
	// (the next bytes are zeroes)
	hash_prefix: u16,
	// The nodes that store this partition, stored as a list of positions in the `nodes`
	// field of the Ring structure
	// Only items 0 up to ring.replication_factor - 1 are used, others are zeros
	nodes_buf: [CompactNodeType; MAX_REPLICATION],
}

impl Ring {
	// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
	// levels of imbrication. It is basically impossible to test, maintain, or understand for an
	// outsider.
	pub(crate) fn new(config: NetworkConfig, replication_factor: usize) -> Self {
		// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
		let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();

		let zones = config
			.members
			.iter()
			.filter(|(_id, info)| info.capacity.is_some())
			.map(|(_id, info)| info.zone.as_str())
			.collect::<HashSet<&str>>();
		let n_zones = zones.len();

		// Prepare ring
		let mut partitions: Vec<Vec<(&Uuid, &NetworkConfigEntry)>> = partitions_idx
			.iter()
			.map(|_i| Vec::new())
			.collect::<Vec<_>>();

		// Create MagLev priority queues for each node
		let mut queues = config
			.members
			.iter()
			.filter(|(_id, info)| info.capacity.is_some())
			.map(|(node_id, node_info)| {
				let mut parts = partitions_idx
					.iter()
					.map(|i| {
						let part_data =
							[&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat();
						(*i, fasthash(&part_data[..]))
					})
					.collect::<Vec<_>>();
				parts.sort_by_key(|(_i, h)| *h);
				let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>();
				(node_id, node_info, parts_i, 0)
			})
			.collect::<Vec<_>>();

		let max_capacity = config
			.members
			.iter()
			.filter_map(|(_, node_info)| node_info.capacity)
			.fold(0, std::cmp::max);

		assert!(replication_factor <= MAX_REPLICATION);

		// Fill up ring
		for rep in 0..replication_factor {
			queues.sort_by_key(|(ni, _np, _q, _p)| {
				let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
				fasthash(&queue_data[..])
			});

			for (_, _, _, pos) in queues.iter_mut() {
				*pos = 0;
			}

			let mut remaining = partitions_idx.len();
			while remaining > 0 {
				let remaining0 = remaining;
				for i_round in 0..max_capacity {
					for (node_id, node_info, q, pos) in queues.iter_mut() {
						if i_round >= node_info.capacity.unwrap() {
							continue;
						}
						for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
							if partitions[qv].len() != rep {
								continue;
							}
							let p_zns = partitions[qv]
								.iter()
								.map(|(_id, info)| info.zone.as_str())
								.collect::<HashSet<&str>>();
							if (p_zns.len() < n_zones && !p_zns.contains(&node_info.zone.as_str()))
								|| (p_zns.len() == n_zones
									&& !partitions[qv].iter().any(|(id, _i)| id == node_id))
							{
								partitions[qv].push((node_id, node_info));
								remaining -= 1;
								*pos = pos2 + 1;
								break;
							}
						}
					}
				}
				if remaining == remaining0 {
					// No progress made, exit
					warn!("Could not build ring, not enough nodes configured.");
					return Self {
						replication_factor,
						config,
						nodes: vec![],
						ring: vec![],
					};
				}
			}
		}

		// Make a canonical order for nodes
		let nodes = config
			.members
			.iter()
			.filter(|(_id, info)| info.capacity.is_some())
			.map(|(id, _)| *id)
			.collect::<Vec<_>>();
		let nodes_rev = nodes
			.iter()
			.enumerate()
			.map(|(i, id)| (*id, i as CompactNodeType))
			.collect::<HashMap<Uuid, CompactNodeType>>();

		let ring = partitions
			.iter()
			.enumerate()
			.map(|(i, nodes)| {
				let top = (i as u16) << (16 - PARTITION_BITS);
				let nodes = nodes
					.iter()
					.map(|(id, _info)| *nodes_rev.get(id).unwrap())
					.collect::<Vec<CompactNodeType>>();
				assert!(nodes.len() == replication_factor);
				let mut nodes_buf = [0u8; MAX_REPLICATION];
				nodes_buf[..replication_factor].copy_from_slice(&nodes[..]);
				RingEntry {
					hash_prefix: top,
					nodes_buf,
				}
			})
			.collect::<Vec<_>>();

		let nodes = nodes
			.iter()
			.map(|id| NodeID::from_slice(id.as_slice()).unwrap())
			.collect::<Vec<_>>();

		Self {
			replication_factor,
			config,
			nodes,
			ring,
		}
	}

	/// Get the partition in which data would fall on
	pub fn partition_of(&self, position: &Hash) -> Partition {
		let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
		top >> (16 - PARTITION_BITS)
	}

	/// Get the list of partitions and the first hash of a partition key that would fall in it
	pub fn partitions(&self) -> Vec<(Partition, Hash)> {
		let mut ret = vec![];

		for (i, entry) in self.ring.iter().enumerate() {
			let mut location = [0u8; 32];
			location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]);
			ret.push((i as u16, location.into()));
		}
		if !ret.is_empty() {
			assert_eq!(ret[0].1, [0u8; 32].into());
		}

		ret
	}

	/// Walk the ring to find the n servers in which data should be replicated
	pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<NodeID> {
		if self.ring.len() != 1 << PARTITION_BITS {
			warn!("Ring not yet ready, read/writes will be lost!");
			return vec![];
		}

		let partition_idx = self.partition_of(position) as usize;
		let partition = &self.ring[partition_idx];

		let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
		// Check that we haven't messed up our partition table, i.e. that this partition
		// table entrey indeed corresponds to the item we are storing
		assert_eq!(
			partition.hash_prefix & PARTITION_MASK_U16,
			top & PARTITION_MASK_U16
		);

		assert!(n <= self.replication_factor);
		partition.nodes_buf[..n]
			.iter()
			.map(|i| self.nodes[*i as usize])
			.collect::<Vec<_>>()
	}
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn test_ring_entry_size() {
		assert_eq!(std::mem::size_of::<RingEntry>(), 8);
	}
}