use std::collections::BTreeMap;
use std::process::exit;
use std::time::Duration;

use k2v_client::*;

use garage_util::formater::format_table;

use rusoto_core::credential::AwsCredentials;
use rusoto_core::Region;

use clap::{Parser, Subcommand};

/// K2V command line interface
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
	/// Name of the region to use
	#[clap(short, long, env = "AWS_REGION", default_value = "garage")]
	region: String,
	/// Url of the endpoint to connect to
	#[clap(short, long, env = "K2V_ENDPOINT")]
	endpoint: String,
	/// Access key ID
	#[clap(short, long, env = "AWS_ACCESS_KEY_ID")]
	key_id: String,
	/// Access key ID
	#[clap(short, long, env = "AWS_SECRET_ACCESS_KEY")]
	secret: String,
	/// Bucket name
	#[clap(short, long, env = "K2V_BUCKET")]
	bucket: String,
	#[clap(subcommand)]
	command: Command,
}

#[derive(Subcommand, Debug)]
enum Command {
	/// Insert a single value
	Insert {
		/// Partition key to insert to
		partition_key: String,
		/// Sort key to insert to
		sort_key: String,
		/// Causality of the insertion
		#[clap(short, long)]
		causality: Option<String>,
		/// Value to insert
		#[clap(flatten)]
		value: Value,
	},
	/// Read a single value
	Read {
		/// Partition key to read from
		partition_key: String,
		/// Sort key to read from
		sort_key: String,
		/// Output formating
		#[clap(flatten)]
		output_kind: ReadOutputKind,
	},
	/// Watch changes on a single value
	PollItem {
		/// Partition key of item to watch
		partition_key: String,
		/// Sort key of item to watch
		sort_key: String,
		/// Causality information
		#[clap(short, long)]
		causality: String,
		/// Timeout, in seconds
		#[clap(short = 'T', long)]
		timeout: Option<u64>,
		/// Output formating
		#[clap(flatten)]
		output_kind: ReadOutputKind,
	},
	/// Watch changes on a range of values
	PollRange {
		/// Partition key to poll from
		partition_key: String,
		/// Output only sort keys matching this filter
		#[clap(flatten)]
		filter: Filter,
		/// Marker of data that had previously been seen by a PollRange
		#[clap(short = 'S', long)]
		seen_marker: Option<String>,
		/// Timeout, in seconds
		#[clap(short = 'T', long)]
		timeout: Option<u64>,
		/// Output formating
		#[clap(flatten)]
		output_kind: BatchOutputKind,
	},
	/// Delete a single value
	Delete {
		/// Partition key to delete from
		partition_key: String,
		/// Sort key to delete from
		sort_key: String,
		/// Causality information
		#[clap(short, long)]
		causality: String,
	},
	/// List partition keys
	ReadIndex {
		/// Output formating
		#[clap(flatten)]
		output_kind: BatchOutputKind,
		/// Output only partition keys matching this filter
		#[clap(flatten)]
		filter: Filter,
	},
	/// Read a range of sort keys
	ReadRange {
		/// Partition key to read from
		partition_key: String,
		/// Output formating
		#[clap(flatten)]
		output_kind: BatchOutputKind,
		/// Output only sort keys matching this filter
		#[clap(flatten)]
		filter: Filter,
	},
	/// Delete a range of sort keys
	DeleteRange {
		/// Partition key to delete from
		partition_key: String,
		/// Output formating
		#[clap(flatten)]
		output_kind: BatchOutputKind,
		/// Delete only sort keys matching this filter
		#[clap(flatten)]
		filter: Filter,
	},
}

/// Where to read a value from
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("value").multiple(false).required(true))]
struct Value {
	/// Read value from a file. use - to read from stdin
	#[clap(short, long, group = "value")]
	file: Option<String>,
	/// Read a base64 value from commandline
	#[clap(short, long, group = "value")]
	b64: Option<String>,
	/// Read a raw (UTF-8) value from the commandline
	#[clap(short, long, group = "value")]
	text: Option<String>,
}

impl Value {
	async fn to_data(&self) -> Result<Vec<u8>, Error> {
		if let Some(ref text) = self.text {
			Ok(text.as_bytes().to_vec())
		} else if let Some(ref b64) = self.b64 {
			base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into()))
		} else if let Some(ref path) = self.file {
			use tokio::io::AsyncReadExt;
			if path == "-" {
				let mut file = tokio::io::stdin();
				let mut vec = Vec::new();
				file.read_to_end(&mut vec).await?;
				Ok(vec)
			} else {
				let mut file = tokio::fs::File::open(path).await?;
				let mut vec = Vec::new();
				file.read_to_end(&mut vec).await?;
				Ok(vec)
			}
		} else {
			unreachable!("Value must have one option set")
		}
	}
}

#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))]
struct ReadOutputKind {
	/// Base64 output. Conflicts are line separated, first line is causality token
	#[clap(short, long, group = "output-kind")]
	b64: bool,
	/// Raw output. Conflicts generate error, causality token is not returned
	#[clap(short, long, group = "output-kind")]
	raw: bool,
	/// Human formated output
	#[clap(short = 'H', long, group = "output-kind")]
	human: bool,
	/// JSON formated output
	#[clap(short, long, group = "output-kind")]
	json: bool,
}

impl ReadOutputKind {
	fn display_output(&self, val: CausalValue) -> ! {
		use std::io::Write;

		if self.json {
			let stdout = std::io::stdout();
			serde_json::to_writer_pretty(stdout, &val).unwrap();
			exit(0);
		}

		if self.raw {
			let mut val = val.value;
			if val.len() != 1 {
				eprintln!(
					"Raw mode can only read non-concurent values, found {} values, expected 1",
					val.len()
				);
				exit(1);
			}
			let val = val.pop().unwrap();
			match val {
				K2vValue::Value(v) => {
					std::io::stdout().write_all(&v).unwrap();
					exit(0);
				}
				K2vValue::Tombstone => {
					eprintln!("Expected value, found tombstone");
					exit(2);
				}
			}
		}

		let causality: String = val.causality.into();
		if self.b64 {
			println!("{}", causality);
			for val in val.value {
				match val {
					K2vValue::Value(v) => {
						println!("{}", base64::encode(&v))
					}
					K2vValue::Tombstone => {
						println!();
					}
				}
			}
			exit(0);
		}

		// human
		println!("causality: {}", causality);
		println!("values:");
		for val in val.value {
			match val {
				K2vValue::Value(v) => {
					if let Ok(string) = std::str::from_utf8(&v) {
						println!("  utf-8: {}", string);
					} else {
						println!("  base64: {}", base64::encode(&v));
					}
				}
				K2vValue::Tombstone => {
					println!("  tombstone");
				}
			}
		}
		exit(0);
	}
}

#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))]
struct BatchOutputKind {
	/// Human formated output
	#[clap(short = 'H', long, group = "output-kind")]
	human: bool,
	/// JSON formated output
	#[clap(short, long, group = "output-kind")]
	json: bool,
}

impl BatchOutputKind {
	fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
		for (key, values) in values {
			println!("key: {}", key);
			let causality: String = values.causality.into();
			println!("causality: {}", causality);
			for value in values.value {
				match value {
					K2vValue::Value(v) => {
						if let Ok(string) = std::str::from_utf8(&v) {
							println!("  value(utf-8): {}", string);
						} else {
							println!("  value(base64): {}", base64::encode(&v));
						}
					}
					K2vValue::Tombstone => {
						println!("  tombstone");
					}
				}
			}
		}
		exit(0);
	}

	fn values_json(&self, values: BTreeMap<String, CausalValue>) -> Vec<serde_json::Value> {
		values
			.into_iter()
			.map(|(k, v)| {
				let mut value = serde_json::to_value(v).unwrap();
				value
					.as_object_mut()
					.unwrap()
					.insert("sort_key".to_owned(), k.into());
				value
			})
			.collect::<Vec<_>>()
	}

	fn display_poll_range_output(
		&self,
		seen_marker: String,
		values: BTreeMap<String, CausalValue>,
	) -> ! {
		if self.json {
			let json = serde_json::json!({
				"values": self.values_json(values),
				"seen_marker": seen_marker,
			});

			let stdout = std::io::stdout();
			serde_json::to_writer_pretty(stdout, &json).unwrap();
			exit(0)
		} else {
			println!("seen marker: {}", seen_marker);
			self.display_human_output(values)
		}
	}

	fn display_read_range_output(&self, res: PaginatedRange<CausalValue>) -> ! {
		if self.json {
			let json = serde_json::json!({
				"next_key": res.next_start,
				"values": self.values_json(res.items),
			});

			let stdout = std::io::stdout();
			serde_json::to_writer_pretty(stdout, &json).unwrap();
			exit(0)
		} else {
			if let Some(next) = res.next_start {
				println!("next key: {}", next);
			}
			self.display_human_output(res.items)
		}
	}
}

/// Filter for batch operations
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))]
struct Filter {
	/// Match only keys starting with this prefix
	#[clap(short, long, group = "filter")]
	prefix: Option<String>,
	/// Match only keys lexicographically after this key (including this key itself)
	#[clap(short, long, group = "filter")]
	start: Option<String>,
	/// Match only keys lexicographically before this key (excluding this key)
	#[clap(short, long, group = "filter")]
	end: Option<String>,
	/// Only match the first X keys
	#[clap(short, long)]
	limit: Option<u64>,
	/// Return keys in reverse order
	#[clap(short, long)]
	reverse: bool,
	/// Return only keys where conflict happened
	#[clap(short, long)]
	conflicts_only: bool,
	/// Also include keys storing only tombstones
	#[clap(short, long)]
	tombstones: bool,
	/// Return any key
	#[clap(short, long, group = "filter")]
	all: bool,
}

impl Filter {
	fn k2v_filter(&self) -> k2v_client::Filter<'_> {
		k2v_client::Filter {
			start: self.start.as_deref(),
			end: self.end.as_deref(),
			prefix: self.prefix.as_deref(),
			limit: self.limit,
			reverse: self.reverse,
		}
	}
}

#[tokio::main]
async fn main() -> Result<(), Error> {
	let args = Args::parse();

	let region = Region::Custom {
		name: args.region,
		endpoint: args.endpoint,
	};

	let creds = AwsCredentials::new(args.key_id, args.secret, None, None);

	let client = K2vClient::new(region, args.bucket, creds, None)?;

	match args.command {
		Command::Insert {
			partition_key,
			sort_key,
			causality,
			value,
		} => {
			client
				.insert_item(
					&partition_key,
					&sort_key,
					value.to_data().await?,
					causality.map(Into::into),
				)
				.await?;
		}
		Command::Delete {
			partition_key,
			sort_key,
			causality,
		} => {
			client
				.delete_item(&partition_key, &sort_key, causality.into())
				.await?;
		}
		Command::Read {
			partition_key,
			sort_key,
			output_kind,
		} => {
			let res = client.read_item(&partition_key, &sort_key).await?;
			output_kind.display_output(res);
		}
		Command::PollItem {
			partition_key,
			sort_key,
			causality,
			timeout,
			output_kind,
		} => {
			let timeout = timeout.map(Duration::from_secs);
			let res_opt = client
				.poll_item(&partition_key, &sort_key, causality.into(), timeout)
				.await?;
			if let Some(res) = res_opt {
				output_kind.display_output(res);
			} else {
				if output_kind.json {
					println!("null");
				} else {
					println!("Delay expired and value didn't change.");
				}
			}
		}
		Command::PollRange {
			partition_key,
			filter,
			seen_marker,
			timeout,
			output_kind,
		} => {
			if filter.conflicts_only
				|| filter.tombstones
				|| filter.reverse
				|| filter.limit.is_some()
			{
				return Err(Error::Message(
					"limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(),
				));
			}

			let timeout = timeout.map(Duration::from_secs);
			let res = client
				.poll_range(
					&partition_key,
					Some(PollRangeFilter {
						start: filter.start.as_deref(),
						end: filter.end.as_deref(),
						prefix: filter.prefix.as_deref(),
					}),
					seen_marker.as_deref(),
					timeout,
				)
				.await?;
			match res {
				Some((items, seen_marker)) => {
					output_kind.display_poll_range_output(seen_marker, items);
				}
				None => {
					if output_kind.json {
						println!("null");
					} else {
						println!("Delay expired and value didn't change.");
					}
				}
			}
		}
		Command::ReadIndex {
			output_kind,
			filter,
		} => {
			if filter.conflicts_only || filter.tombstones {
				return Err(Error::Message(
					"conlicts-only and tombstones are invalid for read-index".into(),
				));
			}
			let res = client.read_index(filter.k2v_filter()).await?;
			if output_kind.json {
				let values = res
					.items
					.into_iter()
					.map(|(k, v)| {
						let mut value = serde_json::to_value(v).unwrap();
						value
							.as_object_mut()
							.unwrap()
							.insert("sort_key".to_owned(), k.into());
						value
					})
					.collect::<Vec<_>>();
				let json = serde_json::json!({
					"next_key": res.next_start,
					"values": values,
				});

				let stdout = std::io::stdout();
				serde_json::to_writer_pretty(stdout, &json).unwrap();
			} else {
				if let Some(next) = res.next_start {
					println!("next key: {}", next);
				}

				let mut to_print = Vec::new();
				to_print.push(format!("key:\tentries\tconflicts\tvalues\tbytes"));
				for (k, v) in res.items {
					to_print.push(format!(
						"{}\t{}\t{}\t{}\t{}",
						k, v.entries, v.conflicts, v.values, v.bytes
					));
				}
				format_table(to_print);
			}
		}
		Command::ReadRange {
			partition_key,
			output_kind,
			filter,
		} => {
			let op = BatchReadOp {
				partition_key: &partition_key,
				filter: filter.k2v_filter(),
				conflicts_only: filter.conflicts_only,
				tombstones: filter.tombstones,
				single_item: false,
			};
			let mut res = client.read_batch(&[op]).await?;
			let res = res.pop().unwrap();
			output_kind.display_read_range_output(res);
		}
		Command::DeleteRange {
			partition_key,
			output_kind,
			filter,
		} => {
			let op = BatchDeleteOp {
				partition_key: &partition_key,
				prefix: filter.prefix.as_deref(),
				start: filter.start.as_deref(),
				end: filter.end.as_deref(),
				single_item: false,
			};
			if filter.reverse
				|| filter.conflicts_only
				|| filter.tombstones
				|| filter.limit.is_some()
			{
				return Err(Error::Message(
					"limit, conlicts-only, reverse and tombstones are invalid for delete-range"
						.into(),
				));
			}

			let res = client.delete_batch(&[op]).await?;

			if output_kind.json {
				println!("{}", res[0]);
			} else {
				println!("deleted {} keys", res[0]);
			}
		}
	}

	Ok(())
}