aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_list.rs
blob: df9c3e6bed2534169e943201687cde8da8303342 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;

use hyper::{Body, Response};

use garage_util::error::Error as GarageError;
use garage_util::time::*;

use garage_model::garage::Garage;
use garage_model::object_table::*;

use garage_table::DeletedFilter;

use crate::encoding::*;
use crate::error::*;
use crate::s3_xml;

#[derive(Debug)]
pub struct ListObjectsQuery {
	pub is_v2: bool,
	pub bucket: String,
	pub delimiter: Option<String>,
	pub max_keys: usize,
	pub prefix: String,
	pub marker: Option<String>,
	pub continuation_token: Option<String>,
	pub start_after: Option<String>,
	pub urlencode_resp: bool,
}

#[derive(Debug)]
struct ListResultInfo {
	last_modified: u64,
	size: u64,
	etag: String,
}

pub async fn handle_list(
	garage: Arc<Garage>,
	query: &ListObjectsQuery,
) -> Result<Response<Body>, Error> {
	let mut result_keys = BTreeMap::<String, ListResultInfo>::new();
	let mut result_common_prefixes = BTreeSet::<String>::new();

	// Determine the key from where we want to start fetch objects
	// from the database, and whether the object at this key must
	// be included or excluded from the response.
	// This key can be the prefix in the base case, or intermediate
	// points in the dataset if we are continuing a previous listing.
	#[allow(clippy::collapsible_else_if)]
	let (mut next_chunk_start, mut next_chunk_exclude_start) = if query.is_v2 {
		if let Some(ct) = &query.continuation_token {
			// In V2 mode, the continuation token is defined as an opaque
			// string in the spec, so we can do whatever we want with it.
			// In our case, it is defined as either [ or ] (for include
			// and exclude, respectively), followed by a base64 string
			// representing the key to start with.
			let exclude = match &ct[..1] {
				"[" => false,
				"]" => true,
				_ => return Err(Error::BadRequest("Invalid continuation token".to_string())),
			};
			(
				String::from_utf8(base64::decode(ct[1..].as_bytes())?)?,
				exclude,
			)
		} else if let Some(sa) = &query.start_after {
			// StartAfter has defined semantics in the spec:
			// start listing at the first key immediately after.
			(sa.clone(), true)
		} else {
			// In the case where neither is specified, we start
			// listing at the specified prefix. If an object has this
			// exact same key, we include it. (TODO is this correct?)
			(query.prefix.clone(), false)
		}
	} else {
		if let Some(mk) = &query.marker {
			// In V1 mode, the spec defines the Marker value to mean
			// the same thing as the StartAfter value in V2 mode.
			(mk.clone(), true)
		} else {
			// Base case, same as in V2 mode
			(query.prefix.clone(), false)
		}
	};

	debug!(
		"List request: `{:?}` {} `{}`, start from {}, exclude first {}",
		query.delimiter, query.max_keys, query.prefix, next_chunk_start, next_chunk_exclude_start
	);

	// `truncated` is a boolean that determines whether there are
	// more items to be added.
	let truncated;
	// `last_processed_item` is the key of the last item
	// that was included in the listing before truncating.
	let mut last_processed_item = None;

	'query_loop: loop {
		// Fetch objects
		let objects = garage
			.object_table
			.get_range(
				&query.bucket,
				Some(next_chunk_start.clone()),
				Some(DeletedFilter::NotDeleted),
				query.max_keys + 1,
			)
			.await?;
		debug!(
			"List: get range {} (max {}), results: {}",
			next_chunk_start,
			query.max_keys + 1,
			objects.len()
		);
		let current_chunk_start = next_chunk_start.clone();

		// Iterate on returned objects and add them to the response.
		// If a delimiter is specified, we take care of grouping objects
		// into CommonPrefixes.
		for object in objects.iter() {
			// If we have retrieved an object that doesn't start with
			// the prefix, we know we have finished listing our stuff.
			if !object.key.starts_with(&query.prefix) {
				truncated = false;
				break 'query_loop;
			}

			// Exclude the starting key if we have to.
			if object.key == next_chunk_start && next_chunk_exclude_start {
				continue;
			}

			// Find if this object has a currently valid (non-deleted,
			// non-still-uploading) version. If not, skip it.
			let version = match object.versions().iter().find(|x| x.is_data()) {
				Some(v) => v,
				None => continue,
			};

			// If we don't have space to add this object to our response,
			// we will need to stop here and mark the key of this object
			// as the marker from where
			// we want to start again in the next list call.
			let cannot_add = result_keys.len() + result_common_prefixes.len() >= query.max_keys;

			// Determine whether this object should be grouped inside
			// a CommonPrefix because it contains the delimiter,
			// or if it should be returned as an object.
			let common_prefix = match &query.delimiter {
				Some(delimiter) => object.key[query.prefix.len()..]
					.find(delimiter)
					.map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]),
				None => None,
			};
			if let Some(pfx) = common_prefix {
				// In the case where this object must be grouped in a
				// common prefix, handle it here.
				if !result_common_prefixes.contains(pfx) {
					// Determine the first listing key that starts after
					// the common prefix, by finding the next possible
					// string by alphabetical order.
					let mut first_key_after_prefix = pfx.to_string();
					let tail = first_key_after_prefix.pop().unwrap();
					first_key_after_prefix.push(((tail as u8) + 1) as char);

					// If this were the end of the chunk,
					// the next chunk should start after this prefix
					next_chunk_start = first_key_after_prefix;
					next_chunk_exclude_start = false;

					if cannot_add {
						truncated = true;
						break 'query_loop;
					}
					result_common_prefixes.insert(pfx.to_string());
				}
				last_processed_item = Some(object.key.clone());
				continue;
			};

			// This is not a common prefix, we want to add it to our
			// response directly.
			next_chunk_start = object.key.clone();

			if cannot_add {
				truncated = true;
				next_chunk_exclude_start = false;
				break 'query_loop;
			}

			let meta = match &version.state {
				ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
				ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
				_ => unreachable!(),
			};
			let info = match result_keys.get(&object.key) {
				None => ListResultInfo {
					last_modified: version.timestamp,
					size: meta.size,
					etag: meta.etag.to_string(),
				},
				Some(_lri) => {
					return Err(Error::InternalError(GarageError::Message(format!(
						"Duplicate key?? {} (this is a bug, please report it)",
						object.key
					))))
				}
			};
			result_keys.insert(object.key.clone(), info);
			last_processed_item = Some(object.key.clone());
			next_chunk_exclude_start = true;
		}

		// If our database returned less objects than what we were asking for,
		// it means that no more objects are in the bucket. So we stop here.
		if objects.len() < query.max_keys + 1 {
			truncated = false;
			break 'query_loop;
		}

		// Sanity check: we should have added at least an object
		// or a prefix to our returned result.
		if next_chunk_start == current_chunk_start || last_processed_item.is_none() {
			return Err(Error::InternalError(GarageError::Message(format!(
							"S3 ListObject: made no progress, still starting at {} (this is a bug, please report it)", next_chunk_start))));
		}

		// Loop and fetch more objects
	}

	let mut result = s3_xml::ListBucketResult {
		xmlns: (),
		name: s3_xml::Value(query.bucket.to_string()),
		prefix: uriencode_maybe(&query.prefix, query.urlencode_resp),
		marker: None,
		next_marker: None,
		start_after: None,
		continuation_token: None,
		next_continuation_token: None,
		max_keys: s3_xml::IntValue(query.max_keys as i64),
		delimiter: query
			.delimiter
			.as_ref()
			.map(|x| uriencode_maybe(x, query.urlencode_resp)),
		encoding_type: match query.urlencode_resp {
			true => Some(s3_xml::Value("url".to_string())),
			false => None,
		},
		key_count: Some(s3_xml::IntValue(
			result_keys.len() as i64 + result_common_prefixes.len() as i64,
		)),
		is_truncated: s3_xml::Value(format!("{}", truncated)),
		contents: vec![],
		common_prefixes: vec![],
	};

	if query.is_v2 {
		if let Some(ct) = &query.continuation_token {
			result.continuation_token = Some(s3_xml::Value(ct.to_string()));
		}
		if let Some(sa) = &query.start_after {
			result.start_after = Some(uriencode_maybe(sa, query.urlencode_resp));
		}
		if truncated {
			let b64 = base64::encode(next_chunk_start.as_bytes());
			let nct = if next_chunk_exclude_start {
				format!("]{}", b64)
			} else {
				format!("[{}", b64)
			};
			result.next_continuation_token = Some(s3_xml::Value(nct));
		}
	} else {
		// TODO: are these supposed to be urlencoded when encoding-type is URL??
		if let Some(mkr) = &query.marker {
			result.marker = Some(uriencode_maybe(mkr, query.urlencode_resp));
		}
		if truncated {
			if let Some(lpi) = last_processed_item {
				result.next_marker = Some(uriencode_maybe(&lpi, query.urlencode_resp));
			} else {
				return Err(Error::InternalError(GarageError::Message(
								"S3 ListObject: last_processed_item is None but the response was truncated, indicating that many items were processed (this is a bug, please report it)".to_string())));
			}
		}
	}

	for (key, info) in result_keys.iter() {
		result.contents.push(s3_xml::ListBucketItem {
			key: uriencode_maybe(key, query.urlencode_resp),
			last_modified: s3_xml::Value(msec_to_rfc3339(info.last_modified)),
			size: s3_xml::IntValue(info.size as i64),
			etag: s3_xml::Value(info.etag.to_string()),
			storage_class: s3_xml::Value("STANDARD".to_string()),
		});
	}

	for pfx in result_common_prefixes.iter() {
		result.common_prefixes.push(s3_xml::CommonPrefix {
			prefix: uriencode_maybe(pfx, query.urlencode_resp),
		});
	}

	let xml = s3_xml::to_xml_with_header(&result)?;

	Ok(Response::builder()
		.header("Content-Type", "application/xml")
		.body(Body::from(xml.into_bytes()))?)
}

fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
	if yes {
		s3_xml::Value(uri_encode(s, true))
	} else {
		s3_xml::Value(s.to_string())
	}
}