use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use std::iter::{Iterator, Peekable};
use std::sync::Arc;

use hyper::{Body, Response};

use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;

use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::Version;

use garage_table::{EmptyKey, EnumerationOrder};

use crate::encoding::*;
use crate::error::*;
use crate::helpers::key_after_prefix;
use crate::s3::put as s3_put;
use crate::s3::xml as s3_xml;

const DUMMY_NAME: &str = "Dummy Key";
const DUMMY_KEY: &str = "GKDummyKey";

#[derive(Debug)]
pub struct ListQueryCommon {
	pub bucket_name: String,
	pub bucket_id: Uuid,
	pub delimiter: Option<String>,
	pub page_size: usize,
	pub prefix: String,
	pub urlencode_resp: bool,
}

#[derive(Debug)]
pub struct ListObjectsQuery {
	pub is_v2: bool,
	pub marker: Option<String>,
	pub continuation_token: Option<String>,
	pub start_after: Option<String>,
	pub common: ListQueryCommon,
}

#[derive(Debug)]
pub struct ListMultipartUploadsQuery {
	pub key_marker: Option<String>,
	pub upload_id_marker: Option<String>,
	pub common: ListQueryCommon,
}

#[derive(Debug)]
pub struct ListPartsQuery {
	pub bucket_name: String,
	pub bucket_id: Uuid,
	pub key: String,
	pub upload_id: String,
	pub part_number_marker: Option<u64>,
	pub max_parts: u64,
}

pub async fn handle_list(
	garage: Arc<Garage>,
	query: &ListObjectsQuery,
) -> Result<Response<Body>, Error> {
	let io = |bucket, key, count| {
		let t = &garage.object_table;
		async move {
			t.get_range(
				&bucket,
				key,
				Some(ObjectFilter::IsData),
				count,
				EnumerationOrder::Forward,
			)
			.await
		}
	};

	debug!("ListObjects {:?}", query);
	let mut acc = query.build_accumulator();
	let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?;

	let result = s3_xml::ListBucketResult {
		xmlns: (),
		// Sending back request information
		name: s3_xml::Value(query.common.bucket_name.to_string()),
		prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp),
		max_keys: s3_xml::IntValue(query.common.page_size as i64),
		delimiter: query
			.common
			.delimiter
			.as_ref()
			.map(|x| uriencode_maybe(x, query.common.urlencode_resp)),
		encoding_type: match query.common.urlencode_resp {
			true => Some(s3_xml::Value("url".to_string())),
			false => None,
		},
		marker: match (!query.is_v2, &query.marker) {
			(true, Some(k)) => Some(uriencode_maybe(k, query.common.urlencode_resp)),
			_ => None,
		},
		start_after: match (query.is_v2, &query.start_after) {
			(true, Some(sa)) => Some(uriencode_maybe(sa, query.common.urlencode_resp)),
			_ => None,
		},
		continuation_token: match (query.is_v2, &query.continuation_token) {
			(true, Some(ct)) => Some(s3_xml::Value(ct.to_string())),
			_ => None,
		},

		// Pagination
		is_truncated: s3_xml::Value(format!("{}", pagination.is_some())),
		key_count: Some(s3_xml::IntValue(
			acc.keys.len() as i64 + acc.common_prefixes.len() as i64,
		)),
		next_marker: match (!query.is_v2, &pagination) {
			(true, Some(RangeBegin::AfterKey { key: k }))
			| (
				true,
				Some(RangeBegin::IncludingKey {
					fallback_key: Some(k),
					..
				}),
			) => Some(uriencode_maybe(k, query.common.urlencode_resp)),
			_ => None,
		},
		next_continuation_token: match (query.is_v2, &pagination) {
			(true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!(
				"]{}",
				base64::encode(key.as_bytes())
			))),
			(true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!(
				"[{}",
				base64::encode(key.as_bytes())
			))),
			_ => None,
		},

		// Body
		contents: acc
			.keys
			.iter()
			.map(|(key, info)| s3_xml::ListBucketItem {
				key: uriencode_maybe(key, query.common.urlencode_resp),
				last_modified: s3_xml::Value(msec_to_rfc3339(info.last_modified)),
				size: s3_xml::IntValue(info.size as i64),
				etag: s3_xml::Value(format!("\"{}\"", info.etag)),
				storage_class: s3_xml::Value("STANDARD".to_string()),
			})
			.collect(),
		common_prefixes: acc
			.common_prefixes
			.iter()
			.map(|pfx| s3_xml::CommonPrefix {
				prefix: uriencode_maybe(pfx, query.common.urlencode_resp),
			})
			.collect(),
	};

	let xml = s3_xml::to_xml_with_header(&result)?;
	Ok(Response::builder()
		.header("Content-Type", "application/xml")
		.body(Body::from(xml.into_bytes()))?)
}

pub async fn handle_list_multipart_upload(
	garage: Arc<Garage>,
	query: &ListMultipartUploadsQuery,
) -> Result<Response<Body>, Error> {
	let io = |bucket, key, count| {
		let t = &garage.object_table;
		async move {
			t.get_range(
				&bucket,
				key,
				Some(ObjectFilter::IsUploading),
				count,
				EnumerationOrder::Forward,
			)
			.await
		}
	};

	debug!("ListMultipartUploads {:?}", query);
	let mut acc = query.build_accumulator();
	let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?;

	let result = s3_xml::ListMultipartUploadsResult {
		xmlns: (),

		// Sending back some information about the request
		bucket: s3_xml::Value(query.common.bucket_name.to_string()),
		prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp),
		delimiter: query
			.common
			.delimiter
			.as_ref()
			.map(|d| uriencode_maybe(d, query.common.urlencode_resp)),
		max_uploads: s3_xml::IntValue(query.common.page_size as i64),
		key_marker: query
			.key_marker
			.as_ref()
			.map(|m| uriencode_maybe(m, query.common.urlencode_resp)),
		upload_id_marker: query
			.upload_id_marker
			.as_ref()
			.map(|m| s3_xml::Value(m.to_string())),
		encoding_type: match query.common.urlencode_resp {
			true => Some(s3_xml::Value("url".to_string())),
			false => None,
		},

		// Handling pagination
		is_truncated: s3_xml::Value(format!("{}", pagination.is_some())),
		next_key_marker: match &pagination {
			None => None,
			Some(RangeBegin::AfterKey { key })
			| Some(RangeBegin::AfterUpload { key, .. })
			| Some(RangeBegin::IncludingKey { key, .. }) => {
				Some(uriencode_maybe(key, query.common.urlencode_resp))
			}
		},
		next_upload_id_marker: match pagination {
			Some(RangeBegin::AfterUpload { upload, .. }) => {
				Some(s3_xml::Value(hex::encode(upload)))
			}
			Some(RangeBegin::IncludingKey { .. }) => Some(s3_xml::Value("include".to_string())),
			_ => None,
		},

		// Result body
		upload: acc
			.keys
			.iter()
			.map(|(uuid, info)| s3_xml::ListMultipartItem {
				initiated: s3_xml::Value(msec_to_rfc3339(info.timestamp)),
				key: uriencode_maybe(&info.key, query.common.urlencode_resp),
				upload_id: s3_xml::Value(hex::encode(uuid)),
				storage_class: s3_xml::Value("STANDARD".to_string()),
				initiator: s3_xml::Initiator {
					display_name: s3_xml::Value(DUMMY_NAME.to_string()),
					id: s3_xml::Value(DUMMY_KEY.to_string()),
				},
				owner: s3_xml::Owner {
					display_name: s3_xml::Value(DUMMY_NAME.to_string()),
					id: s3_xml::Value(DUMMY_KEY.to_string()),
				},
			})
			.collect(),
		common_prefixes: acc
			.common_prefixes
			.iter()
			.map(|c| s3_xml::CommonPrefix {
				prefix: s3_xml::Value(c.to_string()),
			})
			.collect(),
	};

	let xml = s3_xml::to_xml_with_header(&result)?;

	Ok(Response::builder()
		.header("Content-Type", "application/xml")
		.body(Body::from(xml.into_bytes()))?)
}

pub async fn handle_list_parts(
	garage: Arc<Garage>,
	query: &ListPartsQuery,
) -> Result<Response<Body>, Error> {
	debug!("ListParts {:?}", query);

	let upload_id = s3_put::decode_upload_id(&query.upload_id)?;

	let (object, version) = futures::try_join!(
		garage.object_table.get(&query.bucket_id, &query.key),
		garage.version_table.get(&upload_id, &EmptyKey),
	)?;

	let (info, next) = fetch_part_info(query, object, version, upload_id)?;

	let result = s3_xml::ListPartsResult {
		xmlns: (),
		bucket: s3_xml::Value(query.bucket_name.to_string()),
		key: s3_xml::Value(query.key.to_string()),
		upload_id: s3_xml::Value(query.upload_id.to_string()),
		part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)),
		next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)),
		max_parts: s3_xml::IntValue(query.max_parts as i64),
		is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()),
		parts: info
			.iter()
			.map(|part| s3_xml::PartItem {
				etag: s3_xml::Value(format!("\"{}\"", part.etag)),
				last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)),
				part_number: s3_xml::IntValue(part.part_number as i64),
				size: s3_xml::IntValue(part.size as i64),
			})
			.collect(),
		initiator: s3_xml::Initiator {
			display_name: s3_xml::Value(DUMMY_NAME.to_string()),
			id: s3_xml::Value(DUMMY_KEY.to_string()),
		},
		owner: s3_xml::Owner {
			display_name: s3_xml::Value(DUMMY_NAME.to_string()),
			id: s3_xml::Value(DUMMY_KEY.to_string()),
		},
		storage_class: s3_xml::Value("STANDARD".to_string()),
	};

	let xml = s3_xml::to_xml_with_header(&result)?;

	Ok(Response::builder()
		.header("Content-Type", "application/xml")
		.body(Body::from(xml.into_bytes()))?)
}

/*
 * Private enums and structs
 */

#[derive(Debug)]
struct ObjectInfo {
	last_modified: u64,
	size: u64,
	etag: String,
}

#[derive(Debug, PartialEq)]
struct UploadInfo {
	key: String,
	timestamp: u64,
}

#[derive(Debug, PartialEq)]
struct PartInfo {
	etag: String,
	timestamp: u64,
	part_number: u64,
	size: u64,
}

enum ExtractionResult {
	NoMore,
	Filled,
	FilledAtUpload {
		key: String,
		upload: Uuid,
	},
	Extracted {
		key: String,
	},
	// Fallback key is used for legacy APIs that only support
	// exlusive pagination (and not inclusive one).
	SkipTo {
		key: String,
		fallback_key: Option<String>,
	},
}

#[derive(PartialEq, Clone, Debug)]
enum RangeBegin {
	// Fallback key is used for legacy APIs that only support
	// exlusive pagination (and not inclusive one).
	IncludingKey {
		key: String,
		fallback_key: Option<String>,
	},
	AfterKey {
		key: String,
	},
	AfterUpload {
		key: String,
		upload: Uuid,
	},
}
type Pagination = Option<RangeBegin>;

/*
 * Fetch list entries
 */

async fn fetch_list_entries<R, F>(
	query: &ListQueryCommon,
	begin: RangeBegin,
	acc: &mut impl ExtractAccumulator,
	mut io: F,
) -> Result<Pagination, Error>
where
	R: futures::Future<Output = Result<Vec<Object>, GarageError>>,
	F: FnMut(Uuid, Option<String>, usize) -> R,
{
	let mut cursor = begin;
	// +1 is needed as we may need to skip the 1st key
	// (range is inclusive while most S3 requests are exclusive)
	let count = query.page_size + 1;

	loop {
		let start_key = match cursor {
			RangeBegin::AfterKey { ref key }
			| RangeBegin::AfterUpload { ref key, .. }
			| RangeBegin::IncludingKey { ref key, .. } => Some(key.clone()),
		};

		// Fetch objects
		let objects = io(query.bucket_id, start_key.clone(), count).await?;

		debug!(
			"List: get range {:?} (max {}), results: {}",
			start_key,
			count,
			objects.len()
		);
		let server_more = objects.len() >= count;

		let prev_req_cursor = cursor.clone();
		let mut iter = objects.iter().peekable();

		// Drop the first key if needed
		// Only AfterKey requires it according to the S3 spec and our implem.
		match (&cursor, iter.peek()) {
			(RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(),
			(_, _) => None,
		};

		while let Some(object) = iter.peek() {
			if !object.key.starts_with(&query.prefix) {
				// If the key is not in the requested prefix, we're done
				return Ok(None);
			}

			cursor = match acc.extract(query, &cursor, &mut iter) {
				ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key },
				ExtractionResult::SkipTo { key, fallback_key } => {
					RangeBegin::IncludingKey { key, fallback_key }
				}
				ExtractionResult::FilledAtUpload { key, upload } => {
					return Ok(Some(RangeBegin::AfterUpload { key, upload }))
				}
				ExtractionResult::Filled => return Ok(Some(cursor)),
				ExtractionResult::NoMore => return Ok(None),
			};
		}

		if !server_more {
			// We did not fully fill the accumulator despite exhausting all the data we have,
			// we're done
			return Ok(None);
		}

		if prev_req_cursor == cursor {
			unreachable!("No progress has been done in the loop. This is a bug, please report it.");
		}
	}
}

fn fetch_part_info(
	query: &ListPartsQuery,
	object: Option<Object>,
	version: Option<Version>,
	upload_id: Uuid,
) -> Result<(Vec<PartInfo>, Option<u64>), Error> {
	// Check results
	let object = object.ok_or(Error::NoSuchKey)?;

	let obj_version = object
		.versions()
		.iter()
		.find(|v| v.uuid == upload_id && v.is_uploading())
		.ok_or(Error::NoSuchUpload)?;

	let version = version.ok_or(Error::NoSuchKey)?;

	// Cut the beginning of our 2 vectors if required
	let (etags, blocks) = match &query.part_number_marker {
		Some(marker) => {
			let next = marker + 1;

			let part_idx = into_ok_or_err(
				version
					.parts_etags
					.items()
					.binary_search_by(|(part_num, _)| part_num.cmp(&next)),
			);
			let parts = &version.parts_etags.items()[part_idx..];

			let block_idx = into_ok_or_err(
				version
					.blocks
					.items()
					.binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)),
			);
			let blocks = &version.blocks.items()[block_idx..];

			(parts, blocks)
		}
		None => (version.parts_etags.items(), version.blocks.items()),
	};

	// Use the block vector to compute a (part_number, size) vector
	let mut size = Vec::<(u64, u64)>::new();
	blocks.iter().for_each(|(key, val)| {
		let mut new_size = val.size;
		match size.pop() {
			Some((part_number, size)) if part_number == key.part_number => new_size += size,
			Some(v) => size.push(v),
			None => (),
		}
		size.push((key.part_number, new_size))
	});

	// Merge the etag vector and size vector to build a PartInfo vector
	let max_parts = query.max_parts as usize;
	let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable());

	let mut info = Vec::<PartInfo>::with_capacity(max_parts);

	while info.len() < max_parts {
		match (etag_iter.peek(), size_iter.peek()) {
			(Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) {
				Ordering::Less => {
					debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query);
					etag_iter.next();
				}
				Ordering::Equal => {
					info.push(PartInfo {
						etag: etag.to_string(),
						timestamp: obj_version.timestamp,
						part_number: *ep,
						size: *size,
					});
					etag_iter.next();
					size_iter.next();
				}
				Ordering::Greater => {
					debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query);
					size_iter.next();
				}
			},
			(None, None) => return Ok((info, None)),
			_ => {
				debug!(
					"Additional block or ETag information ignored. Query: {:?}",
					query
				);
				return Ok((info, None));
			}
		}
	}

	match info.last() {
		Some(part_info) => {
			let pagination = Some(part_info.part_number);
			Ok((info, pagination))
		}
		None => Ok((info, None)),
	}
}

/*
 * ListQuery logic
 */

/// Determine the key from where we want to start fetch objects from the database
///
/// We choose whether the object at this key must
/// be included or excluded from the response.
/// This key can be the prefix in the base case, or intermediate
/// points in the dataset if we are continuing a previous listing.
impl ListObjectsQuery {
	fn build_accumulator(&self) -> Accumulator<String, ObjectInfo> {
		Accumulator::<String, ObjectInfo>::new(self.common.page_size)
	}

	fn begin(&self) -> Result<RangeBegin, Error> {
		if self.is_v2 {
			match (&self.continuation_token, &self.start_after) {
				// In V2 mode, the continuation token is defined as an opaque
				// string in the spec, so we can do whatever we want with it.
				// In our case, it is defined as either [ or ] (for include
				// representing the key to start with.
				(Some(token), _) => match &token[..1] {
					"[" => Ok(RangeBegin::IncludingKey {
						key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
						fallback_key: None,
					}),
					"]" => Ok(RangeBegin::AfterKey {
						key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
					}),
					_ => Err(Error::BadRequest("Invalid continuation token".to_string())),
				},

				// StartAfter has defined semantics in the spec:
				// start listing at the first key immediately after.
				(_, Some(key)) => Ok(RangeBegin::AfterKey {
					key: key.to_string(),
				}),

				// In the case where neither is specified, we start
				// listing at the specified prefix. If an object has this
				// exact same key, we include it. (@TODO is this correct?)
				_ => Ok(RangeBegin::IncludingKey {
					key: self.common.prefix.to_string(),
					fallback_key: None,
				}),
			}
		} else {
			match &self.marker {
				// In V1 mode, the spec defines the Marker value to mean
				// the same thing as the StartAfter value in V2 mode.
				Some(key) => Ok(RangeBegin::AfterKey {
					key: key.to_string(),
				}),
				_ => Ok(RangeBegin::IncludingKey {
					key: self.common.prefix.to_string(),
					fallback_key: None,
				}),
			}
		}
	}
}

impl ListMultipartUploadsQuery {
	fn build_accumulator(&self) -> Accumulator<Uuid, UploadInfo> {
		Accumulator::<Uuid, UploadInfo>::new(self.common.page_size)
	}

	fn begin(&self) -> Result<RangeBegin, Error> {
		match (&self.upload_id_marker, &self.key_marker) {
			// If both the upload id marker and the key marker are sets,
			// the spec specifies that we must start listing uploads INCLUDING the given key,
			// AFTER the specified upload id (sorted in a lexicographic order).
			// To enable some optimisations, we emulate "IncludingKey" by extending the upload id
			// semantic. We base our reasoning on the hypothesis that S3's upload ids are opaques
			// while Garage's ones are 32 bytes hex encoded which enables us to extend this query
			// with a specific "include" upload id.
			(Some(up_marker), Some(key_marker)) => match &up_marker[..] {
				"include" => Ok(RangeBegin::IncludingKey {
					key: key_marker.to_string(),
					fallback_key: None,
				}),
				uuid => Ok(RangeBegin::AfterUpload {
					key: key_marker.to_string(),
					upload: s3_put::decode_upload_id(uuid)?,
				}),
			},

			// If only the key marker is specified, the spec says that we must start listing
			// uploads AFTER the specified key.
			(None, Some(key_marker)) => Ok(RangeBegin::AfterKey {
				key: key_marker.to_string(),
			}),
			_ => Ok(RangeBegin::IncludingKey {
				key: self.common.prefix.to_string(),
				fallback_key: None,
			}),
		}
	}
}

/*
 * Accumulator logic
 */

trait ExtractAccumulator {
	fn extract<'a>(
		&mut self,
		query: &ListQueryCommon,
		cursor: &RangeBegin,
		iter: &mut Peekable<impl Iterator<Item = &'a Object>>,
	) -> ExtractionResult;
}

struct Accumulator<K, V> {
	common_prefixes: BTreeSet<String>,
	keys: BTreeMap<K, V>,
	max_capacity: usize,
}

type ObjectAccumulator = Accumulator<String, ObjectInfo>;
type UploadAccumulator = Accumulator<Uuid, UploadInfo>;

impl<K: std::cmp::Ord, V> Accumulator<K, V> {
	fn new(page_size: usize) -> Accumulator<K, V> {
		Accumulator {
			common_prefixes: BTreeSet::<String>::new(),
			keys: BTreeMap::<K, V>::new(),
			max_capacity: page_size,
		}
	}

	/// Observe the Object iterator and try to extract a single common prefix
	///
	/// This function can consume an arbitrary number of items as long as they share the same
	/// common prefix.
	fn extract_common_prefix<'a>(
		&mut self,
		objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
		query: &ListQueryCommon,
	) -> Option<ExtractionResult> {
		// Get the next object from the iterator
		let object = objects.peek().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");

		// Check if this is a common prefix (requires a passed delimiter and its value in the key)
		let pfx = match common_prefix(object, query) {
			Some(p) => p,
			None => return None,
		};

		// Try to register this prefix
		// If not possible, we can return early
		if !self.try_insert_common_prefix(pfx.to_string()) {
			return Some(ExtractionResult::Filled);
		}

		// We consume the whole common prefix from the iterator
		let mut last_pfx_key = &object.key;
		loop {
			last_pfx_key = match objects.peek() {
				Some(o) if o.key.starts_with(pfx) => &o.key,
				Some(_) => {
					return Some(ExtractionResult::Extracted {
						key: last_pfx_key.to_owned(),
					})
				}
				None => {
					return match key_after_prefix(pfx) {
						Some(next) => Some(ExtractionResult::SkipTo {
							key: next,
							fallback_key: Some(last_pfx_key.to_owned()),
						}),
						None => Some(ExtractionResult::NoMore),
					}
				}
			};

			objects.next();
		}
	}

	fn is_full(&mut self) -> bool {
		self.keys.len() + self.common_prefixes.len() >= self.max_capacity
	}

	fn try_insert_common_prefix(&mut self, key: String) -> bool {
		// If we already have an entry, we can continue
		if self.common_prefixes.contains(&key) {
			return true;
		}

		// Otherwise, we need to check if we can add it
		match self.is_full() {
			true => false,
			false => {
				self.common_prefixes.insert(key);
				true
			}
		}
	}

	fn try_insert_entry(&mut self, key: K, value: V) -> bool {
		// It is impossible to add twice a key, this is an error
		assert!(!self.keys.contains_key(&key));

		match self.is_full() {
			true => false,
			false => {
				self.keys.insert(key, value);
				true
			}
		}
	}
}

impl ExtractAccumulator for ObjectAccumulator {
	fn extract<'a>(
		&mut self,
		query: &ListQueryCommon,
		_cursor: &RangeBegin,
		objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
	) -> ExtractionResult {
		if let Some(e) = self.extract_common_prefix(objects, query) {
			return e;
		}

		let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");

		let version = match object.versions().iter().find(|x| x.is_data()) {
			Some(v) => v,
			None => unreachable!(
				"Expect to have objects having data due to earlier filtering. This is a logic bug."
			),
		};

		let meta = match &version.state {
			ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
			ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
			_ => unreachable!(),
		};
		let info = ObjectInfo {
			last_modified: version.timestamp,
			size: meta.size,
			etag: meta.etag.to_string(),
		};

		match self.try_insert_entry(object.key.clone(), info) {
			true => ExtractionResult::Extracted {
				key: object.key.clone(),
			},
			false => ExtractionResult::Filled,
		}
	}
}

impl ExtractAccumulator for UploadAccumulator {
	/// Observe the iterator, process a single key, and try to extract one or more upload entries
	///
	/// This function processes a single object from the iterator that can contain an arbitrary
	/// number of versions, and thus "uploads".
	fn extract<'a>(
		&mut self,
		query: &ListQueryCommon,
		cursor: &RangeBegin,
		objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
	) -> ExtractionResult {
		if let Some(e) = self.extract_common_prefix(objects, query) {
			return e;
		}

		// Get the next object from the iterator
		let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");

		let mut uploads_for_key = object
			.versions()
			.iter()
			.filter(|x| x.is_uploading())
			.collect::<Vec<&ObjectVersion>>();

		// S3 logic requires lexicographically sorted upload ids.
		uploads_for_key.sort_unstable_by_key(|e| e.uuid);

		// Skip results if an upload marker is provided
		if let RangeBegin::AfterUpload { upload, .. } = cursor {
			// Because our data are sorted, we can use a binary search to find the UUID
			// or to find where it should have been added. Once this position is found,
			// we use it to discard the first part of the array.
			let idx = match uploads_for_key.binary_search_by(|e| e.uuid.cmp(upload)) {
				// we start after the found uuid so we need to discard the pointed value.
				// In the worst case, the UUID is the last element, which lead us to an empty array
				// but we are never out of bound.
				Ok(i) => i + 1,
				// if the UUID is not found, the upload may have been discarded between the 2 request,
				// this function returns where it could have been inserted,
				// the pointed value is thus greater than our marker and we need to keep it.
				Err(i) => i,
			};
			uploads_for_key = uploads_for_key[idx..].to_vec();
		}

		let mut iter = uploads_for_key.iter();

		// The first entry is a specific case
		// as it changes our result enum type
		let first_upload = match iter.next() {
			Some(u) => u,
			None => {
				return ExtractionResult::Extracted {
					key: object.key.clone(),
				}
			}
		};
		let first_up_info = UploadInfo {
			key: object.key.to_string(),
			timestamp: first_upload.timestamp,
		};
		if !self.try_insert_entry(first_upload.uuid, first_up_info) {
			return ExtractionResult::Filled;
		}

		// We can then collect the remaining uploads in a loop
		let mut prev_uuid = first_upload.uuid;
		for upload in iter {
			let up_info = UploadInfo {
				key: object.key.to_string(),
				timestamp: upload.timestamp,
			};

			// Insert data in our accumulator
			// If it is full, return information to paginate.
			if !self.try_insert_entry(upload.uuid, up_info) {
				return ExtractionResult::FilledAtUpload {
					key: object.key.clone(),
					upload: prev_uuid,
				};
			}
			// Update our last added UUID
			prev_uuid = upload.uuid;
		}

		// We successfully collected all the uploads
		ExtractionResult::Extracted {
			key: object.key.clone(),
		}
	}
}

/*
 * Utility functions
 */

/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable
fn into_ok_or_err<T>(r: Result<T, T>) -> T {
	match r {
		Ok(r) => r,
		Err(r) => r,
	}
}

/// Returns the common prefix of the object given the query prefix and delimiter
fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> {
	match &query.delimiter {
		Some(delimiter) => object.key[query.prefix.len()..]
			.find(delimiter)
			.map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]),
		None => None,
	}
}

/// URIencode a value if needed
fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
	if yes {
		s3_xml::Value(uri_encode(s, true))
	} else {
		s3_xml::Value(s.to_string())
	}
}

/*
 * Unit tests of this module
 */
#[cfg(test)]
mod tests {
	use super::*;
	use garage_model::s3::version_table::*;
	use garage_util::*;
	use std::iter::FromIterator;

	const TS: u64 = 1641394898314;

	fn bucket() -> Uuid {
		Uuid::from([0x42; 32])
	}

	fn query() -> ListMultipartUploadsQuery {
		ListMultipartUploadsQuery {
			common: ListQueryCommon {
				prefix: "".to_string(),
				delimiter: Some("/".to_string()),
				page_size: 1000,
				urlencode_resp: false,
				bucket_name: "a".to_string(),
				bucket_id: Uuid::from([0x00; 32]),
			},
			key_marker: None,
			upload_id_marker: None,
		}
	}

	fn objs() -> Vec<Object> {
		vec![
			Object::new(
				bucket(),
				"a/b/c".to_string(),
				vec![objup_version([0x01; 32])],
			),
			Object::new(bucket(), "d".to_string(), vec![objup_version([0x01; 32])]),
		]
	}

	fn objup_version(uuid: [u8; 32]) -> ObjectVersion {
		ObjectVersion {
			uuid: Uuid::from(uuid),
			timestamp: TS,
			state: ObjectVersionState::Uploading(ObjectVersionHeaders {
				content_type: "text/plain".to_string(),
				other: BTreeMap::<String, String>::new(),
			}),
		}
	}

	#[test]
	fn test_common_prefixes() {
		let mut query = query();
		let objs = objs();

		query.common.prefix = "a/".to_string();
		assert_eq!(
			common_prefix(objs.get(0).unwrap(), &query.common),
			Some("a/b/")
		);

		query.common.prefix = "a/b/".to_string();
		assert_eq!(common_prefix(objs.get(0).unwrap(), &query.common), None);
	}

	#[test]
	fn test_extract_common_prefix() {
		let mut query = query();
		query.common.prefix = "a/".to_string();
		let objs = objs();
		let mut acc = UploadAccumulator::new(query.common.page_size);

		let mut iter = objs.iter().peekable();
		match acc.extract_common_prefix(&mut iter, &query.common) {
			Some(ExtractionResult::Extracted { key }) => assert_eq!(key, "a/b/c".to_string()),
			_ => panic!("wrong result"),
		}
		assert_eq!(acc.common_prefixes.len(), 1);
		assert_eq!(acc.common_prefixes.iter().next().unwrap(), "a/b/");
	}

	#[test]
	fn test_extract_upload() {
		let objs = vec![
			Object::new(
				bucket(),
				"b".to_string(),
				vec![
					objup_version([0x01; 32]),
					objup_version([0x80; 32]),
					objup_version([0x8f; 32]),
					objup_version([0xdd; 32]),
				],
			),
			Object::new(bucket(), "c".to_string(), vec![]),
		];

		let mut acc = UploadAccumulator::new(2);
		let mut start = RangeBegin::AfterUpload {
			key: "b".to_string(),
			upload: Uuid::from([0x01; 32]),
		};

		let mut iter = objs.iter().peekable();

		// Check the case where we skip some uploads
		match acc.extract(&(query().common), &start, &mut iter) {
			ExtractionResult::FilledAtUpload { key, upload } => {
				assert_eq!(key, "b");
				assert_eq!(upload, Uuid::from([0x8f; 32]));
			}
			_ => panic!("wrong result"),
		};

		assert_eq!(acc.keys.len(), 2);
		assert_eq!(
			acc.keys.get(&Uuid::from([0x80; 32])).unwrap(),
			&UploadInfo {
				timestamp: TS,
				key: "b".to_string()
			}
		);
		assert_eq!(
			acc.keys.get(&Uuid::from([0x8f; 32])).unwrap(),
			&UploadInfo {
				timestamp: TS,
				key: "b".to_string()
			}
		);

		acc = UploadAccumulator::new(2);
		start = RangeBegin::AfterUpload {
			key: "b".to_string(),
			upload: Uuid::from([0xff; 32]),
		};
		iter = objs.iter().peekable();

		// Check the case where we skip all the uploads
		match acc.extract(&(query().common), &start, &mut iter) {
			ExtractionResult::Extracted { key } if key.as_str() == "b" => (),
			_ => panic!("wrong result"),
		};
	}

	#[tokio::test]
	async fn test_fetch_uploads_no_result() -> Result<(), Error> {
		let query = query();
		let mut acc = query.build_accumulator();
		let page = fetch_list_entries(
			&query.common,
			query.begin()?,
			&mut acc,
			|_, _, _| async move { Ok(vec![]) },
		)
		.await?;
		assert_eq!(page, None);
		assert_eq!(acc.common_prefixes.len(), 0);
		assert_eq!(acc.keys.len(), 0);

		Ok(())
	}

	#[tokio::test]
	async fn test_fetch_uploads_basic() -> Result<(), Error> {
		let query = query();
		let mut acc = query.build_accumulator();
		let mut fake_io = |_, _, _| async move { Ok(objs()) };
		let page =
			fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?;
		assert_eq!(page, None);
		assert_eq!(acc.common_prefixes.len(), 1);
		assert_eq!(acc.keys.len(), 1);
		assert!(acc.common_prefixes.contains("a/"));

		Ok(())
	}

	#[tokio::test]
	async fn test_fetch_uploads_advanced() -> Result<(), Error> {
		let mut query = query();
		query.common.page_size = 2;

		let mut fake_io = |_, k: Option<String>, _| async move {
			Ok(match k.as_deref() {
				Some("") => vec![
					Object::new(bucket(), "b/a".to_string(), vec![objup_version([0x01; 32])]),
					Object::new(bucket(), "b/b".to_string(), vec![objup_version([0x01; 32])]),
					Object::new(bucket(), "b/c".to_string(), vec![objup_version([0x01; 32])]),
				],
				Some("b0") => vec![
					Object::new(bucket(), "c/a".to_string(), vec![objup_version([0x01; 32])]),
					Object::new(bucket(), "c/b".to_string(), vec![objup_version([0x01; 32])]),
					Object::new(bucket(), "c/c".to_string(), vec![objup_version([0x02; 32])]),
				],
				Some("c0") => vec![Object::new(
					bucket(),
					"d".to_string(),
					vec![objup_version([0x01; 32])],
				)],
				_ => panic!("wrong value {:?}", k),
			})
		};

		let mut acc = query.build_accumulator();
		let page =
			fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?;
		assert_eq!(
			page,
			Some(RangeBegin::IncludingKey {
				key: "c0".to_string(),
				fallback_key: Some("c/c".to_string())
			})
		);
		assert_eq!(acc.common_prefixes.len(), 2);
		assert_eq!(acc.keys.len(), 0);
		assert!(acc.common_prefixes.contains("b/"));
		assert!(acc.common_prefixes.contains("c/"));

		Ok(())
	}

	fn version() -> Version {
		let uuid = Uuid::from([0x08; 32]);

		let blocks = vec![
			(
				VersionBlockKey {
					part_number: 1,
					offset: 1,
				},
				VersionBlock {
					hash: uuid,
					size: 3,
				},
			),
			(
				VersionBlockKey {
					part_number: 1,
					offset: 2,
				},
				VersionBlock {
					hash: uuid,
					size: 2,
				},
			),
			(
				VersionBlockKey {
					part_number: 2,
					offset: 1,
				},
				VersionBlock {
					hash: uuid,
					size: 8,
				},
			),
			(
				VersionBlockKey {
					part_number: 5,
					offset: 1,
				},
				VersionBlock {
					hash: uuid,
					size: 7,
				},
			),
			(
				VersionBlockKey {
					part_number: 8,
					offset: 1,
				},
				VersionBlock {
					hash: uuid,
					size: 5,
				},
			),
		];
		let etags = vec![
			(1, "etag1".to_string()),
			(3, "etag2".to_string()),
			(5, "etag3".to_string()),
			(8, "etag4".to_string()),
			(9, "etag5".to_string()),
		];

		Version {
			bucket_id: uuid,
			key: "a".to_string(),
			uuid,
			deleted: false.into(),
			blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks),
			parts_etags: crdt::Map::<u64, String>::from_iter(etags),
		}
	}

	fn obj() -> Object {
		Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])])
	}

	#[test]
	fn test_fetch_part_info() -> Result<(), Error> {
		let uuid = Uuid::from([0x08; 32]);
		let mut query = ListPartsQuery {
			bucket_name: "a".to_string(),
			bucket_id: uuid,
			key: "a".to_string(),
			upload_id: "xx".to_string(),
			part_number_marker: None,
			max_parts: 2,
		};

		assert!(
			fetch_part_info(&query, None, None, uuid).is_err(),
			"No object and version should fail"
		);
		assert!(
			fetch_part_info(&query, Some(obj()), None, uuid).is_err(),
			"No version should faild"
		);
		assert!(
			fetch_part_info(&query, None, Some(version()), uuid).is_err(),
			"No object should fail"
		);

		// Start from the beginning but with limited size to trigger pagination
		let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
		assert_eq!(pagination.unwrap(), 5);
		assert_eq!(
			info,
			vec![
				PartInfo {
					etag: "etag1".to_string(),
					timestamp: TS,
					part_number: 1,
					size: 5
				},
				PartInfo {
					etag: "etag3".to_string(),
					timestamp: TS,
					part_number: 5,
					size: 7
				},
			]
		);

		// Use previous pagination to make a new request
		query.part_number_marker = Some(pagination.unwrap());
		let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
		assert!(pagination.is_none());
		assert_eq!(
			info,
			vec![PartInfo {
				etag: "etag4".to_string(),
				timestamp: TS,
				part_number: 8,
				size: 5
			},]
		);

		// Trying to access a part that is way larger than registered ones
		query.part_number_marker = Some(9999);
		let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
		assert!(pagination.is_none());
		assert_eq!(info, vec![]);

		// Try without any limitation
		query.max_parts = 1000;
		query.part_number_marker = None;
		let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
		assert!(pagination.is_none());
		assert_eq!(
			info,
			vec![
				PartInfo {
					etag: "etag1".to_string(),
					timestamp: TS,
					part_number: 1,
					size: 5
				},
				PartInfo {
					etag: "etag3".to_string(),
					timestamp: TS,
					part_number: 5,
					size: 7
				},
				PartInfo {
					etag: "etag4".to_string(),
					timestamp: TS,
					part_number: 8,
					size: 5
				},
			]
		);

		Ok(())
	}
}