use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};

use garage_db as db;

use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;

use garage_table::crdt::*;
use garage_table::replication::*;
use garage_table::*;

pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
	const COUNTER_TABLE_NAME: &'static str;

	type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
	type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;

	fn counter_partition_key(&self) -> &Self::CP;
	fn counter_sort_key(&self) -> &Self::CS;
	fn counts(&self) -> Vec<(&'static str, i64)>;
}

/// A counter entry in the global table
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct CounterEntry<T: CountedItem> {
	pub pk: T::CP,
	pub sk: T::CS,
	pub values: BTreeMap<String, CounterValue>,
}

impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
	fn partition_key(&self) -> &T::CP {
		&self.pk
	}
	fn sort_key(&self) -> &T::CS {
		&self.sk
	}
	fn is_tombstone(&self) -> bool {
		self.values
			.iter()
			.all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0))
	}
}

impl<T: CountedItem> CounterEntry<T> {
	pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
		let nodes = &ring.layout.node_id_vec[..];
		self.filtered_values_with_nodes(nodes)
	}

	pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> {
		let mut ret = HashMap::new();
		for (name, vals) in self.values.iter() {
			let new_vals = vals
				.node_values
				.iter()
				.filter(|(n, _)| nodes.contains(n))
				.map(|(_, (_, v))| *v)
				.collect::<Vec<_>>();
			if !new_vals.is_empty() {
				ret.insert(
					name.clone(),
					new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)),
				);
			}
		}

		ret
	}
}

/// A counter entry in the global table
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct CounterValue {
	pub node_values: BTreeMap<Uuid, (u64, i64)>,
}

impl<T: CountedItem> Crdt for CounterEntry<T> {
	fn merge(&mut self, other: &Self) {
		for (name, e2) in other.values.iter() {
			if let Some(e) = self.values.get_mut(name) {
				e.merge(e2);
			} else {
				self.values.insert(name.clone(), e2.clone());
			}
		}
	}
}

impl Crdt for CounterValue {
	fn merge(&mut self, other: &Self) {
		for (node, (t2, e2)) in other.node_values.iter() {
			if let Some((t, e)) = self.node_values.get_mut(node) {
				if t2 > t {
					*e = *e2;
				}
			} else {
				self.node_values.insert(*node, (*t2, *e2));
			}
		}
	}
}

pub struct CounterTable<T: CountedItem> {
	_phantom_t: PhantomData<T>,
}

impl<T: CountedItem> TableSchema for CounterTable<T> {
	const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME;

	type P = T::CP;
	type S = T::CS;
	type E = CounterEntry<T>;
	type Filter = (DeletedFilter, Vec<Uuid>);

	fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
		if filter.0 == DeletedFilter::Any {
			return true;
		}

		let is_tombstone = entry
			.filtered_values_with_nodes(&filter.1[..])
			.iter()
			.all(|(_, v)| *v == 0);
		filter.0.apply(is_tombstone)
	}
}

// ----

pub struct IndexCounter<T: CountedItem> {
	this_node: Uuid,
	local_counter: db::Tree,
	propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
	pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}

impl<T: CountedItem> IndexCounter<T> {
	pub fn new(
		system: Arc<System>,
		replication: TableShardedReplication,
		db: &db::Db,
	) -> Arc<Self> {
		let background = system.background.clone();

		let (propagate_tx, propagate_rx) = mpsc::unbounded_channel();

		let this = Arc::new(Self {
			this_node: system.id,
			local_counter: db
				.open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME))
				.expect("Unable to open local counter tree"),
			propagate_tx,
			table: Table::new(
				CounterTable {
					_phantom_t: Default::default(),
				},
				replication,
				system,
				db,
			),
		});

		background.spawn_worker(IndexPropagatorWorker {
			index_counter: this.clone(),
			propagate_rx,
			buf: HashMap::new(),
			errors: 0,
		});

		this
	}

	pub fn count(
		&self,
		tx: &mut db::Transaction,
		old: Option<&T>,
		new: Option<&T>,
	) -> db::TxResult<(), Error> {
		let pk = old
			.map(|e| e.counter_partition_key())
			.unwrap_or_else(|| new.unwrap().counter_partition_key());
		let sk = old
			.map(|e| e.counter_sort_key())
			.unwrap_or_else(|| new.unwrap().counter_sort_key());

		// calculate counter differences
		let mut counts = HashMap::new();
		for (k, v) in old.map(|x| x.counts()).unwrap_or_default() {
			*counts.entry(k).or_insert(0) -= v;
		}
		for (k, v) in new.map(|x| x.counts()).unwrap_or_default() {
			*counts.entry(k).or_insert(0) += v;
		}

		// update local counter table
		let tree_key = self.table.data.tree_key(pk, sk);

		let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
			Some(old_bytes) => {
				rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
					.map_err(Error::RmpDecode)
					.map_err(db::TxError::Abort)?
			}
			None => LocalCounterEntry {
				pk: pk.clone(),
				sk: sk.clone(),
				values: BTreeMap::new(),
			},
		};

		let now = now_msec();
		for (s, inc) in counts.iter() {
			let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
			ent.0 = std::cmp::max(ent.0 + 1, now);
			ent.1 += *inc;
		}

		let new_entry_bytes = rmp_to_vec_all_named(&entry)
			.map_err(Error::RmpEncode)
			.map_err(db::TxError::Abort)?;
		tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;

		if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) {
			error!(
				"Could not propagate updated counter values, failed to send to channel: {}",
				e
			);
		}

		Ok(())
	}

	pub fn offline_recount_all<TS, TR>(
		&self,
		counted_table: &Arc<Table<TS, TR>>,
	) -> Result<(), Error>
	where
		TS: TableSchema<E = T>,
		TR: TableReplication,
	{
		let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
			let entry_k = self
				.table
				.data
				.tree_key(entry.partition_key(), entry.sort_key());
			self.table
				.data
				.update_entry_with(&entry_k, |ent| match ent {
					Some(mut ent) => {
						ent.merge(&entry);
						ent
					}
					None => entry.clone(),
				})?;
			Ok(())
		};

		// 1. Set all old local counters to zero
		let now = now_msec();
		let mut next_start: Option<Vec<u8>> = None;
		loop {
			let low_bound = match next_start.take() {
				Some(v) => Bound::Excluded(v),
				None => Bound::Unbounded,
			};
			let mut batch = vec![];
			for item in self.local_counter.range((low_bound, Bound::Unbounded))? {
				batch.push(item?);
				if batch.len() > 1000 {
					break;
				}
			}

			if batch.is_empty() {
				break;
			}

			info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
			for (local_counter_k, local_counter) in batch {
				let mut local_counter =
					rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;

				for (_, tv) in local_counter.values.iter_mut() {
					tv.0 = std::cmp::max(tv.0 + 1, now);
					tv.1 = 0;
				}

				let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
				self.local_counter
					.insert(&local_counter_k, &local_counter_bytes)?;

				let counter_entry = local_counter.into_counter_entry(self.this_node);
				save_counter_entry(counter_entry)?;

				next_start = Some(local_counter_k);
			}
		}

		// 2. Recount all table entries
		let now = now_msec();
		let mut next_start: Option<Vec<u8>> = None;
		loop {
			let low_bound = match next_start.take() {
				Some(v) => Bound::Excluded(v),
				None => Bound::Unbounded,
			};
			let mut batch = vec![];
			for item in counted_table
				.data
				.store
				.range((low_bound, Bound::Unbounded))?
			{
				batch.push(item?);
				if batch.len() > 1000 {
					break;
				}
			}

			if batch.is_empty() {
				break;
			}

			info!("counting entries... ({})", hex::encode(&batch[0].0));
			for (counted_entry_k, counted_entry) in batch {
				let counted_entry = counted_table.data.decode_entry(&counted_entry)?;

				let pk = counted_entry.counter_partition_key();
				let sk = counted_entry.counter_sort_key();
				let counts = counted_entry.counts();

				let local_counter_key = self.table.data.tree_key(pk, sk);
				let mut local_counter = match self.local_counter.get(&local_counter_key)? {
					Some(old_bytes) => {
						let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
							&old_bytes,
						)?;
						assert!(ent.pk == *pk);
						assert!(ent.sk == *sk);
						ent
					}
					None => LocalCounterEntry {
						pk: pk.clone(),
						sk: sk.clone(),
						values: BTreeMap::new(),
					},
				};
				for (s, v) in counts.iter() {
					let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
					tv.0 = std::cmp::max(tv.0 + 1, now);
					tv.1 += v;
				}

				let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
				self.local_counter
					.insert(&local_counter_key, local_counter_bytes)?;

				let counter_entry = local_counter.into_counter_entry(self.this_node);
				save_counter_entry(counter_entry)?;

				next_start = Some(counted_entry_k);
			}
		}

		// Done
		Ok(())
	}
}

struct IndexPropagatorWorker<T: CountedItem> {
	index_counter: Arc<IndexCounter<T>>,
	propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,

	buf: HashMap<Vec<u8>, CounterEntry<T>>,
	errors: usize,
}

impl<T: CountedItem> IndexPropagatorWorker<T> {
	fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) {
		let tree_key = self.index_counter.table.data.tree_key(&pk, &sk);
		let dist_entry = counters.into_counter_entry(self.index_counter.this_node);
		match self.buf.entry(tree_key) {
			hash_map::Entry::Vacant(e) => {
				e.insert(dist_entry);
			}
			hash_map::Entry::Occupied(mut e) => {
				e.get_mut().merge(&dist_entry);
			}
		}
	}
}

#[async_trait]
impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
	fn name(&self) -> String {
		format!("{} counter", T::COUNTER_TABLE_NAME)
	}

	fn status(&self) -> WorkerStatus {
		WorkerStatus {
			queue_length: Some(self.buf.len() as u64),
			..Default::default()
		}
	}

	async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
		// This loop batches updates to counters to be sent all at once.
		// They are sent once the propagate_rx channel has been emptied (or is closed).
		let closed = loop {
			match self.propagate_rx.try_recv() {
				Ok((pk, sk, counters)) => {
					self.add_ent(pk, sk, counters);
				}
				Err(mpsc::error::TryRecvError::Empty) => break false,
				Err(mpsc::error::TryRecvError::Disconnected) => break true,
			}
		};

		if !self.buf.is_empty() {
			let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>();
			let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap());
			if let Err(e) = self.index_counter.table.insert_many(entries).await {
				self.errors += 1;
				if self.errors >= 2 && *must_exit.borrow() {
					error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
					return Ok(WorkerState::Done);
				}
				// Propagate error up to worker manager, it will log it, increment a counter,
				// and sleep for a certain delay (with exponential backoff), waiting for
				// things to go back to normal
				return Err(e);
			} else {
				for k in entries_k {
					self.buf.remove(&k);
				}
				self.errors = 0;
			}

			return Ok(WorkerState::Busy);
		} else if closed {
			return Ok(WorkerState::Done);
		} else {
			return Ok(WorkerState::Idle);
		}
	}

	async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
		match self.propagate_rx.recv().await {
			Some((pk, sk, counters)) => {
				self.add_ent(pk, sk, counters);
				WorkerState::Busy
			}
			None => match self.buf.is_empty() {
				false => WorkerState::Busy,
				true => WorkerState::Done,
			},
		}
	}
}

#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
struct LocalCounterEntry<T: CountedItem> {
	pk: T::CP,
	sk: T::CS,
	values: BTreeMap<String, (u64, i64)>,
}

impl<T: CountedItem> LocalCounterEntry<T> {
	fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
		CounterEntry {
			pk: self.pk,
			sk: self.sk,
			values: self
				.values
				.into_iter()
				.map(|(name, (ts, v))| {
					let mut node_values = BTreeMap::new();
					node_values.insert(this_node, (ts, v));
					(name, CounterValue { node_values })
				})
				.collect(),
		}
	}
}