use garage_util::crdt::Crdt;
use garage_util::data::*;
use garage_util::error::*;

use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;

use crate::cli::*;

pub async fn cli_layout_command_dispatch(
	cmd: LayoutOperation,
	system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
	rpc_host: NodeID,
) -> Result<(), Error> {
	match cmd {
		LayoutOperation::Assign(configure_opt) => {
			cmd_assign_role(system_rpc_endpoint, rpc_host, configure_opt).await
		}
		LayoutOperation::Remove(remove_opt) => {
			cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await
		}
		LayoutOperation::Show => cmd_show_layout(system_rpc_endpoint, rpc_host).await,
		LayoutOperation::Apply(apply_opt) => {
			cmd_apply_layout(system_rpc_endpoint, rpc_host, apply_opt).await
		}
		LayoutOperation::Revert(revert_opt) => {
			cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await
		}
	}
}

pub async fn cmd_assign_role(
	rpc_cli: &Endpoint<SystemRpc, ()>,
	rpc_host: NodeID,
	args: AssignRoleOpt,
) -> Result<(), Error> {
	let status = match rpc_cli
		.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
		.await??
	{
		SystemRpc::ReturnKnownNodes(nodes) => nodes,
		resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
	};

	let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;

	let mut layout = fetch_layout(rpc_cli, rpc_host).await?;

	let mut roles = layout.roles.clone();
	roles.merge(&layout.staging);

	for replaced in args.replace.iter() {
		let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
		match roles.get(&replaced_node) {
			Some(NodeRoleV(Some(_))) => {
				layout
					.staging
					.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
			}
			_ => {
				return Err(Error::Message(format!(
					"Cannot replace node {:?} as it is not currently in planned layout",
					replaced_node
				)));
			}
		}
	}

	if args.capacity.is_some() && args.gateway {
		return Err(Error::Message(
				"-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
	}
	if args.capacity == Some(0) {
		return Err(Error::Message("Invalid capacity value: 0".into()));
	}

	let new_entry = match roles.get(&added_node) {
		Some(NodeRoleV(Some(old))) => {
			let capacity = match args.capacity {
				Some(c) => Some(c),
				None if args.gateway => None,
				None => old.capacity,
			};
			let tags = if args.tags.is_empty() {
				old.tags.clone()
			} else {
				args.tags
			};
			NodeRole {
				zone: args.zone.unwrap_or_else(|| old.zone.to_string()),
				capacity,
				tags,
			}
		}
		_ => {
			let capacity = match args.capacity {
				Some(c) => Some(c),
				None if args.gateway => None,
				None => return Err(Error::Message(
						"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
			};
			NodeRole {
				zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?,
				capacity,
				tags: args.tags,
			}
		}
	};

	layout
		.staging
		.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));

	send_layout(rpc_cli, rpc_host, layout).await?;

	println!("Role change is staged but not yet commited.");
	println!("Use `garage layout show` to view staged role changes,");
	println!("and `garage layout apply` to enact staged changes.");
	Ok(())
}

pub async fn cmd_remove_role(
	rpc_cli: &Endpoint<SystemRpc, ()>,
	rpc_host: NodeID,
	args: RemoveRoleOpt,
) -> Result<(), Error> {
	let mut layout = fetch_layout(rpc_cli, rpc_host).await?;

	let mut roles = layout.roles.clone();
	roles.merge(&layout.staging);

	let deleted_node =
		find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;

	layout
		.staging
		.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));

	send_layout(rpc_cli, rpc_host, layout).await?;

	println!("Role removal is staged but not yet commited.");
	println!("Use `garage layout show` to view staged role changes,");
	println!("and `garage layout apply` to enact staged changes.");
	Ok(())
}

pub async fn cmd_show_layout(
	rpc_cli: &Endpoint<SystemRpc, ()>,
	rpc_host: NodeID,
) -> Result<(), Error> {
	let mut layout = fetch_layout(rpc_cli, rpc_host).await?;

	println!("==== CURRENT CLUSTER LAYOUT ====");
	if !print_cluster_layout(&layout) {
		println!("No nodes currently have a role in the cluster.");
		println!("See `garage status` to view available nodes.");
	}
	println!();
	println!("Current cluster layout version: {}", layout.version);

	if print_staging_role_changes(&layout) {
		layout.roles.merge(&layout.staging);

		println!();
		println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
		if !print_cluster_layout(&layout) {
			println!("No nodes have a role in the new layout.");
		}
		println!();

		// this will print the stats of what partitions
		// will move around when we apply
		if layout.calculate_partition_assignation() {
			println!("To enact the staged role changes, type:");
			println!();
			println!("    garage layout apply --version {}", layout.version + 1);
			println!();
			println!(
				"You can also revert all proposed changes with: garage layout revert --version {}",
				layout.version + 1
			);
		} else {
			println!("Not enough nodes have an assigned role to maintain enough copies of data.");
			println!("This new layout cannot yet be applied.");
		}
	}

	Ok(())
}

pub async fn cmd_apply_layout(
	rpc_cli: &Endpoint<SystemRpc, ()>,
	rpc_host: NodeID,
	apply_opt: ApplyLayoutOpt,
) -> Result<(), Error> {
	let mut layout = fetch_layout(rpc_cli, rpc_host).await?;

	match apply_opt.version {
		None => {
			println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
			println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
			return Err(Error::Message("--version flag is missing".into()));
		}
		Some(v) => {
			if v != layout.version + 1 {
				return Err(Error::Message("Invalid value of --version flag".into()));
			}
		}
	}

	layout.roles.merge(&layout.staging);

	if !layout.calculate_partition_assignation() {
		return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
	}

	layout.staging.clear();
	layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);

	layout.version += 1;

	send_layout(rpc_cli, rpc_host, layout).await?;

	println!("New cluster layout with updated role assignation has been applied in cluster.");
	println!("Data will now be moved around between nodes accordingly.");

	Ok(())
}

pub async fn cmd_revert_layout(
	rpc_cli: &Endpoint<SystemRpc, ()>,
	rpc_host: NodeID,
	revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
	let mut layout = fetch_layout(rpc_cli, rpc_host).await?;

	match revert_opt.version {
		None => {
			println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
			println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
			return Err(Error::Message("--version flag is missing".into()));
		}
		Some(v) => {
			if v != layout.version + 1 {
				return Err(Error::Message("Invalid value of --version flag".into()));
			}
		}
	}

	layout.staging.clear();
	layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);

	layout.version += 1;

	send_layout(rpc_cli, rpc_host, layout).await?;

	println!("All proposed role changes in cluster layout have been canceled.");
	Ok(())
}

// --- utility ---

pub async fn fetch_layout(
	rpc_cli: &Endpoint<SystemRpc, ()>,
	rpc_host: NodeID,
) -> Result<ClusterLayout, Error> {
	match rpc_cli
		.call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL)
		.await??
	{
		SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
		resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
	}
}

pub async fn send_layout(
	rpc_cli: &Endpoint<SystemRpc, ()>,
	rpc_host: NodeID,
	layout: ClusterLayout,
) -> Result<(), Error> {
	rpc_cli
		.call(
			&rpc_host,
			&SystemRpc::AdvertiseClusterLayout(layout),
			PRIO_NORMAL,
		)
		.await??;
	Ok(())
}

pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
	let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
	for (id, _, role) in layout.roles.items().iter() {
		let role = match &role.0 {
			Some(r) => r,
			_ => continue,
		};
		let tags = role.tags.join(",");
		table.push(format!(
			"{:?}\t{}\t{}\t{}",
			id,
			tags,
			role.zone,
			role.capacity_string()
		));
	}
	if table.len() == 1 {
		false
	} else {
		format_table(table);
		true
	}
}

pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
	if !layout.staging.items().is_empty() {
		println!();
		println!("==== STAGED ROLE CHANGES ====");
		let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
		for (id, _, role) in layout.staging.items().iter() {
			if let Some(role) = &role.0 {
				let tags = role.tags.join(",");
				table.push(format!(
					"{:?}\t{}\t{}\t{}",
					id,
					tags,
					role.zone,
					role.capacity_string()
				));
			} else {
				table.push(format!("{:?}\tREMOVED", id));
			}
		}
		format_table(table);
		true
	} else {
		false
	}
}