use std::collections::BTreeMap;
use std::sync::Arc;

use serde::{Deserialize, Serialize};

use garage_db as db;
use garage_util::data::*;

use garage_table::crdt::*;
use garage_table::*;

use crate::index_counter::*;
use crate::k2v::causality::*;
use crate::k2v::sub::*;

pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";

mod v08 {
	use crate::k2v::causality::K2VNodeId;
	use garage_util::data::Uuid;
	use serde::{Deserialize, Serialize};
	use std::collections::BTreeMap;

	#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
	pub struct K2VItem {
		pub partition: K2VItemPartition,
		pub sort_key: String,

		pub(super) items: BTreeMap<K2VNodeId, DvvsEntry>,
	}

	#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
	pub struct K2VItemPartition {
		pub bucket_id: Uuid,
		pub partition_key: String,
	}

	#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
	pub struct DvvsEntry {
		pub(super) t_discard: u64,
		pub(super) values: Vec<(u64, DvvsValue)>,
	}

	#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
	pub enum DvvsValue {
		Value(#[serde(with = "serde_bytes")] Vec<u8>),
		Deleted,
	}

	impl garage_util::migrate::InitialFormat for K2VItem {}
}

pub use v08::*;

impl K2VItem {
	/// Creates a new K2VItem when no previous entry existed in the db
	pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self {
		Self {
			partition: K2VItemPartition {
				bucket_id,
				partition_key,
			},
			sort_key,
			items: BTreeMap::new(),
		}
	}
	/// Updates a K2VItem with a new value or a deletion event
	pub fn update(
		&mut self,
		this_node: Uuid,
		context: &Option<CausalContext>,
		new_value: DvvsValue,
		node_ts: u64,
	) -> u64 {
		if let Some(context) = context {
			for (node, t_discard) in context.vector_clock.iter() {
				if let Some(e) = self.items.get_mut(node) {
					e.t_discard = std::cmp::max(e.t_discard, *t_discard);
				} else {
					self.items.insert(
						*node,
						DvvsEntry {
							t_discard: *t_discard,
							values: vec![],
						},
					);
				}
			}
		}

		self.discard();

		let node_id = make_node_id(this_node);
		let e = self.items.entry(node_id).or_insert(DvvsEntry {
			t_discard: 0,
			values: vec![],
		});
		let t_prev = e.max_time();
		let t_new = std::cmp::max(t_prev + 1, node_ts + 1);
		e.values.push((t_new, new_value));
		t_new
	}

	/// Extract the causality context of a K2V Item
	pub fn causal_context(&self) -> CausalContext {
		let mut cc = CausalContext::new();
		for (node, ent) in self.items.iter() {
			cc.vector_clock.insert(*node, ent.max_time());
		}
		cc
	}

	/// Extract the list of values
	pub fn values(&'_ self) -> Vec<&'_ DvvsValue> {
		let mut ret = vec![];
		for (_, ent) in self.items.iter() {
			for (_, v) in ent.values.iter() {
				if !ret.contains(&v) {
					ret.push(v);
				}
			}
		}
		ret
	}

	fn discard(&mut self) {
		for (_, ent) in self.items.iter_mut() {
			ent.discard();
		}
	}
}

impl DvvsEntry {
	fn max_time(&self) -> u64 {
		self.values
			.iter()
			.fold(self.t_discard, |acc, (vts, _)| std::cmp::max(acc, *vts))
	}

	fn discard(&mut self) {
		self.values = std::mem::take(&mut self.values)
			.into_iter()
			.filter(|(t, _)| *t > self.t_discard)
			.collect::<Vec<_>>();
	}
}

impl Crdt for K2VItem {
	fn merge(&mut self, other: &Self) {
		for (node, e2) in other.items.iter() {
			if let Some(e) = self.items.get_mut(node) {
				e.merge(e2);
			} else {
				self.items.insert(*node, e2.clone());
			}
		}
	}
}

impl Crdt for DvvsEntry {
	fn merge(&mut self, other: &Self) {
		self.t_discard = std::cmp::max(self.t_discard, other.t_discard);
		self.discard();

		let t_max = self.max_time();
		for (vt, vv) in other.values.iter() {
			if *vt > t_max {
				self.values.push((*vt, vv.clone()));
			}
		}
	}
}

impl PartitionKey for K2VItemPartition {
	fn hash(&self) -> Hash {
		use blake2::{Blake2b512, Digest};

		let mut hasher = Blake2b512::new();
		hasher.update(self.bucket_id.as_slice());
		hasher.update(self.partition_key.as_bytes());
		let mut hash = [0u8; 32];
		hash.copy_from_slice(&hasher.finalize()[..32]);
		hash.into()
	}
}

impl Entry<K2VItemPartition, String> for K2VItem {
	fn partition_key(&self) -> &K2VItemPartition {
		&self.partition
	}
	fn sort_key(&self) -> &String {
		&self.sort_key
	}
	fn is_tombstone(&self) -> bool {
		self.values()
			.iter()
			.all(|v| matches!(v, DvvsValue::Deleted))
	}
}

pub struct K2VItemTable {
	pub(crate) counter_table: Arc<IndexCounter<K2VItem>>,
	pub(crate) subscriptions: Arc<SubscriptionManager>,
}

#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct ItemFilter {
	pub exclude_only_tombstones: bool,
	pub conflicts_only: bool,
}

impl TableSchema for K2VItemTable {
	const TABLE_NAME: &'static str = "k2v_item";

	type P = K2VItemPartition;
	type S = String;
	type E = K2VItem;
	type Filter = ItemFilter;

	fn updated(
		&self,
		tx: &mut db::Transaction,
		old: Option<&Self::E>,
		new: Option<&Self::E>,
	) -> db::TxOpResult<()> {
		// 1. Count
		let counter_res = self.counter_table.count(tx, old, new);
		if let Err(e) = db::unabort(counter_res)? {
			// This result can be returned by `counter_table.count()` for instance
			// if messagepack serialization or deserialization fails at some step.
			// Warn admin but ignore this error for now, that's all we can do.
			error!(
				"Unable to update K2V item counter: {}. Index values will be wrong!",
				e
			);
		}

		// 2. Notify
		if let Some(new_ent) = new {
			self.subscriptions.notify(new_ent);
		}

		Ok(())
	}

	#[allow(clippy::nonminimal_bool)]
	fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
		let v = entry.values();
		!(filter.conflicts_only && v.len() < 2)
			&& !(filter.exclude_only_tombstones && entry.is_tombstone())
	}
}

impl CountedItem for K2VItem {
	const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter_v2";

	// Partition key = bucket id
	type CP = Uuid;
	// Sort key = K2V item's partition key
	type CS = String;

	fn counter_partition_key(&self) -> &Uuid {
		&self.partition.bucket_id
	}
	fn counter_sort_key(&self) -> &String {
		&self.partition.partition_key
	}

	fn counts(&self) -> Vec<(&'static str, i64)> {
		let values = self.values();

		let n_entries = if self.is_tombstone() { 0 } else { 1 };
		let n_conflicts = if values.len() > 1 { 1 } else { 0 };
		let n_values = values
			.iter()
			.filter(|v| matches!(v, DvvsValue::Value(_)))
			.count() as i64;
		let n_bytes = values
			.iter()
			.map(|v| match v {
				DvvsValue::Deleted => 0,
				DvvsValue::Value(v) => v.len() as i64,
			})
			.sum();

		vec![
			(ENTRIES, n_entries),
			(CONFLICTS, n_conflicts),
			(VALUES, n_values),
			(BYTES, n_bytes),
		]
	}
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn test_dvvsentry_merge_simple() {
		let e1 = DvvsEntry {
			t_discard: 4,
			values: vec![
				(5, DvvsValue::Value(vec![15])),
				(6, DvvsValue::Value(vec![16])),
			],
		};
		let e2 = DvvsEntry {
			t_discard: 5,
			values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)],
		};

		let mut e3 = e1;
		e3.merge(&e2);
		assert_eq!(e2, e3);
	}
}