//! Module containing types related to computing nodes which should receive a copy of data blocks
//! and metadata
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;

use serde::{Deserialize, Serialize};

use garage_util::data::*;

/// A partition id, which is stored on 16 bits
/// i.e. we have up to 2**16 partitions.
/// (in practice we have exactly 2**PARTITION_BITS partitions)
pub type Partition = u16;

// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
// Maximum value : 16
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8;

const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);

/// The user-defined configuration of the cluster's nodes
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
	/// Map of each node's id to it's configuration
	pub members: HashMap<Uuid, NetworkConfigEntry>,
	/// Version of this config
	pub version: u64,
}

impl NetworkConfig {
	pub(crate) fn new() -> Self {
		Self {
			members: HashMap::new(),
			version: 0,
		}
	}

	pub(crate) fn migrate_from_021(old: garage_rpc_021::ring::NetworkConfig) -> Self {
		let members = old
			.members
			.into_iter()
			.map(|(id, conf)| {
				(
					Hash::try_from(id.as_slice()).unwrap(),
					NetworkConfigEntry {
						zone: conf.datacenter,
						capacity: if conf.capacity == 0 {
							None
						} else {
							Some(conf.capacity)
						},
						tag: conf.tag,
					},
				)
			})
			.collect();
		Self {
			members,
			version: old.version,
		}
	}
}

/// The overall configuration of one (possibly remote) node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfigEntry {
	/// Datacenter at which this entry belong. This infromation might be used to perform a better
	/// geodistribution
	pub zone: String,
	/// The (relative) capacity of the node
	/// If this is set to None, the node does not participate in storing data for the system
	/// and is only active as an API gateway to other nodes
	pub capacity: Option<u32>,
	/// A tag to recognize the entry, not used for other things than display
	pub tag: String,
}

impl NetworkConfigEntry {
	pub fn capacity_string(&self) -> String {
		match self.capacity {
			Some(c) => format!("{}", c),
			None => "gateway".to_string(),
		}
	}
}

/// A ring distributing fairly objects to nodes
#[derive(Clone)]
pub struct Ring {
	/// The replication factor for this ring
	pub replication_factor: usize,

	/// The network configuration used to generate this ring
	pub config: NetworkConfig,

	// Internal order of nodes used to make a more compact representation of the ring
	nodes: Vec<Uuid>,

	// The list of entries in the ring
	ring: Vec<RingEntry>,
}

// Type to store compactly the id of a node in the system
// Change this to u16 the day we want to have more than 256 nodes in a cluster
type CompactNodeType = u8;

// The maximum number of times an object might get replicated
// This must be at least 3 because Garage supports 3-way replication
// Here we use 6 so that the size of a ring entry is 8 bytes
// (2 bytes partition id, 6 bytes node numbers as u8s)
const MAX_REPLICATION: usize = 6;

/// An entry in the ring
#[derive(Clone, Debug)]
struct RingEntry {
	// The two first bytes of the first hash that goes in this partition
	// (the next bytes are zeroes)
	hash_prefix: u16,
	// The nodes that store this partition, stored as a list of positions in the `nodes`
	// field of the Ring structure
	// Only items 0 up to ring.replication_factor - 1 are used, others are zeros
	nodes_buf: [CompactNodeType; MAX_REPLICATION],
}

impl Ring {
	// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
	// levels of imbrication. It is basically impossible to test, maintain, or understand for an
	// outsider.
	pub(crate) fn new(config: NetworkConfig, replication_factor: usize) -> Self {
		// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
		let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();

		let zones = config
			.members
			.iter()
			.filter(|(_id, info)| info.capacity.is_some())
			.map(|(_id, info)| info.zone.as_str())
			.collect::<HashSet<&str>>();
		let n_zones = zones.len();

		// Prepare ring
		let mut partitions: Vec<Vec<(&Uuid, &NetworkConfigEntry)>> = partitions_idx
			.iter()
			.map(|_i| Vec::new())
			.collect::<Vec<_>>();

		// Create MagLev priority queues for each node
		let mut queues = config
			.members
			.iter()
			.filter(|(_id, info)| info.capacity.is_some())
			.map(|(node_id, node_info)| {
				let mut parts = partitions_idx
					.iter()
					.map(|i| {
						let part_data =
							[&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat();
						(*i, fasthash(&part_data[..]))
					})
					.collect::<Vec<_>>();
				parts.sort_by_key(|(_i, h)| *h);
				let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>();
				(node_id, node_info, parts_i, 0)
			})
			.collect::<Vec<_>>();

		let max_capacity = config
			.members
			.iter()
			.filter_map(|(_, node_info)| node_info.capacity)
			.fold(0, std::cmp::max);

		assert!(replication_factor <= MAX_REPLICATION);

		// Fill up ring
		for rep in 0..replication_factor {
			queues.sort_by_key(|(ni, _np, _q, _p)| {
				let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
				fasthash(&queue_data[..])
			});

			for (_, _, _, pos) in queues.iter_mut() {
				*pos = 0;
			}

			let mut remaining = partitions_idx.len();
			while remaining > 0 {
				let remaining0 = remaining;
				for i_round in 0..max_capacity {
					for (node_id, node_info, q, pos) in queues.iter_mut() {
						if i_round >= node_info.capacity.unwrap() {
							continue;
						}
						for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
							if partitions[qv].len() != rep {
								continue;
							}
							let p_zns = partitions[qv]
								.iter()
								.map(|(_id, info)| info.zone.as_str())
								.collect::<HashSet<&str>>();
							if (p_zns.len() < n_zones && !p_zns.contains(&node_info.zone.as_str()))
								|| (p_zns.len() == n_zones
									&& !partitions[qv].iter().any(|(id, _i)| id == node_id))
							{
								partitions[qv].push((node_id, node_info));
								remaining -= 1;
								*pos = pos2 + 1;
								break;
							}
						}
					}
				}
				if remaining == remaining0 {
					// No progress made, exit
					warn!("Could not build ring, not enough nodes configured.");
					return Self {
						replication_factor,
						config,
						nodes: vec![],
						ring: vec![],
					};
				}
			}
		}

		// Make a canonical order for nodes
		let nodes = config
			.members
			.iter()
			.filter(|(_id, info)| info.capacity.is_some())
			.map(|(id, _)| *id)
			.collect::<Vec<_>>();
		let nodes_rev = nodes
			.iter()
			.enumerate()
			.map(|(i, id)| (*id, i as CompactNodeType))
			.collect::<HashMap<Uuid, CompactNodeType>>();

		let ring = partitions
			.iter()
			.enumerate()
			.map(|(i, nodes)| {
				let top = (i as u16) << (16 - PARTITION_BITS);
				let nodes = nodes
					.iter()
					.map(|(id, _info)| *nodes_rev.get(id).unwrap())
					.collect::<Vec<CompactNodeType>>();
				assert!(nodes.len() == replication_factor);
				let mut nodes_buf = [0u8; MAX_REPLICATION];
				nodes_buf[..replication_factor].copy_from_slice(&nodes[..]);
				RingEntry {
					hash_prefix: top,
					nodes_buf,
				}
			})
			.collect::<Vec<_>>();

		Self {
			replication_factor,
			config,
			nodes,
			ring,
		}
	}

	/// Get the partition in which data would fall on
	pub fn partition_of(&self, position: &Hash) -> Partition {
		let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
		top >> (16 - PARTITION_BITS)
	}

	/// Get the list of partitions and the first hash of a partition key that would fall in it
	pub fn partitions(&self) -> Vec<(Partition, Hash)> {
		let mut ret = vec![];

		for (i, entry) in self.ring.iter().enumerate() {
			let mut location = [0u8; 32];
			location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]);
			ret.push((i as u16, location.into()));
		}
		if !ret.is_empty() {
			assert_eq!(ret[0].1, [0u8; 32].into());
		}

		ret
	}

	/// Walk the ring to find the n servers in which data should be replicated
	pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<Uuid> {
		if self.ring.len() != 1 << PARTITION_BITS {
			warn!("Ring not yet ready, read/writes will be lost!");
			return vec![];
		}

		let partition_idx = self.partition_of(position) as usize;
		let partition = &self.ring[partition_idx];

		let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap());
		// Check that we haven't messed up our partition table, i.e. that this partition
		// table entrey indeed corresponds to the item we are storing
		assert_eq!(
			partition.hash_prefix & PARTITION_MASK_U16,
			top & PARTITION_MASK_U16
		);

		assert!(n <= self.replication_factor);
		partition.nodes_buf[..n]
			.iter()
			.map(|i| self.nodes[*i as usize])
			.collect::<Vec<_>>()
	}
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn test_ring_entry_size() {
		assert_eq!(std::mem::size_of::<RingEntry>(), 8);
	}
}