use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;

use opentelemetry::{
	trace::{FutureExt, TraceContextExt, Tracer},
	Context,
};

use garage_db as db;

use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use garage_util::migrate::Migrate;

use garage_rpc::rpc_helper::QuorumSetResultTracker;
use garage_rpc::system::System;
use garage_rpc::*;

use crate::crdt::Crdt;
use crate::data::*;
use crate::gc::*;
use crate::merkle::*;
use crate::queue::InsertQueueWorker;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
use crate::util::*;

pub struct Table<F: TableSchema, R: TableReplication> {
	pub system: Arc<System>,
	pub data: Arc<TableData<F, R>>,
	pub merkle_updater: Arc<MerkleUpdater<F, R>>,
	pub syncer: Arc<TableSyncer<F, R>>,
	gc: Arc<TableGc<F, R>>,
	endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}

#[derive(Serialize, Deserialize)]
pub(crate) enum TableRpc<F: TableSchema> {
	Ok,

	ReadEntry(F::P, F::S),
	ReadEntryResponse(Option<ByteBuf>),

	// Read range: read all keys in partition P, possibly starting at a certain sort key offset
	ReadRange {
		partition: F::P,
		begin_sort_key: Option<F::S>,
		filter: Option<F::Filter>,
		limit: usize,
		enumeration_order: EnumerationOrder,
	},

	Update(Vec<Arc<ByteBuf>>),
}

impl<F: TableSchema> Rpc for TableRpc<F> {
	type Response = Result<TableRpc<F>, Error>;
}

impl<F: TableSchema, R: TableReplication> Table<F, R> {
	// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============

	pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
		let endpoint = system
			.netapp
			.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));

		let data = TableData::new(system.clone(), instance, replication, db);

		let merkle_updater = MerkleUpdater::new(data.clone());

		let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
		let gc = TableGc::new(system.clone(), data.clone());

		system.layout_manager.add_table(F::TABLE_NAME);

		let table = Arc::new(Self {
			system,
			data,
			merkle_updater,
			gc,
			syncer,
			endpoint,
		});

		table.endpoint.set_handler(table.clone());

		table
	}

	pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
		self.merkle_updater.spawn_workers(bg);
		self.syncer.spawn_workers(bg);
		self.gc.spawn_workers(bg);
		bg.spawn_worker(InsertQueueWorker(self.clone()));
	}

	pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
		let tracer = opentelemetry::global::tracer("garage_table");
		let span = tracer.start(format!("{} insert", F::TABLE_NAME));

		self.insert_internal(e)
			.bound_record_duration(&self.data.metrics.put_request_duration)
			.with_context(Context::current_with_span(span))
			.await?;

		self.data.metrics.put_request_counter.add(1);

		Ok(())
	}

	async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
		let hash = e.partition_key().hash();
		let who = self.data.replication.write_sets(&hash);

		let e_enc = Arc::new(ByteBuf::from(e.encode()?));
		let rpc = TableRpc::<F>::Update(vec![e_enc]);

		self.system
			.rpc_helper()
			.try_write_many_sets(
				&self.endpoint,
				who.as_ref(),
				rpc,
				RequestStrategy::with_priority(PRIO_NORMAL)
					.with_quorum(self.data.replication.write_quorum()),
			)
			.await?;

		Ok(())
	}

	/// Insert item locally
	pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> {
		self.data.queue_insert(tx, e)
	}

	pub async fn insert_many<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
	where
		I: IntoIterator<Item = IE> + Send + Sync,
		IE: Borrow<F::E> + Send + Sync,
	{
		let tracer = opentelemetry::global::tracer("garage_table");
		let span = tracer.start(format!("{} insert_many", F::TABLE_NAME));

		self.insert_many_internal(entries)
			.bound_record_duration(&self.data.metrics.put_request_duration)
			.with_context(Context::current_with_span(span))
			.await?;

		self.data.metrics.put_request_counter.add(1);

		Ok(())
	}

	async fn insert_many_internal<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
	where
		I: IntoIterator<Item = IE> + Send + Sync,
		IE: Borrow<F::E> + Send + Sync,
	{
		// The different items will have to be stored on possibly different nodes.
		// We will here batch all items into a single request for each concerned
		// node, with all of the entries it must store within that request.
		// Each entry has to be saved to a specific list of "write sets", i.e. a set
		// of node within wich a quorum must be achieved. In normal operation, there
		// is a single write set which corresponds to the quorum in the current
		// cluster layout, but when the layout is updated, multiple write sets might
		// have to be handled at once. Here, since we are sending many entries, we
		// will have to handle many write sets in all cases. The algorihtm is thus
		// to send one request to each node with all the items it must save,
		// and keep track of the OK responses within each write set: if for all sets
		// a quorum of nodes has answered OK, then the insert has succeeded and
		// consistency properties (read-after-write) are preserved.

		let quorum = self.data.replication.write_quorum();

		// Serialize all entries and compute the write sets for each of them.
		// In the case of sharded table replication, this also takes an "ack lock"
		// to the layout manager to avoid ack'ing newer versions which are not
		// taken into account by writes in progress (the ack can happen later, once
		// all writes that didn't take the new layout into account are finished).
		// These locks are released when entries_vec is dropped, i.e. when this
		// function returns.
		let mut entries_vec = Vec::new();
		for entry in entries.into_iter() {
			let entry = entry.borrow();
			let hash = entry.partition_key().hash();
			let mut write_sets = self.data.replication.write_sets(&hash);
			for set in write_sets.as_mut().iter_mut() {
				// Sort nodes in each write sets to merge write sets with same
				// nodes but in possibly different orders
				set.sort();
			}
			let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
			entries_vec.push((write_sets, e_enc));
		}

		// Compute a deduplicated list of all of the write sets,
		// and compute an index from each node to the position of the sets in which
		// it takes part, to optimize the detection of a quorum.
		let mut write_sets = entries_vec
			.iter()
			.flat_map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice()))
			.collect::<Vec<&[Uuid]>>();
		write_sets.sort();
		write_sets.dedup();

		let mut result_tracker = QuorumSetResultTracker::new(&write_sets, quorum);

		// Build a map of all nodes to the entries that must be sent to that node.
		let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new();
		for (write_sets, entry_enc) in entries_vec.iter() {
			for write_set in write_sets.as_ref().iter() {
				for node in write_set.iter() {
					let node_entries = call_list.entry(*node).or_default();
					match node_entries.last() {
						Some(x) if Arc::ptr_eq(x, entry_enc) => {
							// skip if entry already in list to send to this node
							// (could happen if node is in several write sets for this entry)
						}
						_ => {
							node_entries.push(entry_enc.clone());
						}
					}
				}
			}
		}

		// Build futures to actually perform each of the corresponding RPC calls
		let call_futures = call_list.into_iter().map(|(node, entries)| {
			let this = self.clone();
			async move {
				let rpc = TableRpc::<F>::Update(entries);
				let resp = this
					.system
					.rpc_helper()
					.call(
						&this.endpoint,
						node,
						rpc,
						RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(quorum),
					)
					.await;
				(node, resp)
			}
		});

		// Run all requests in parallel thanks to FuturesUnordered, and collect results.
		let mut resps = call_futures.collect::<FuturesUnordered<_>>();

		while let Some((node, resp)) = resps.next().await {
			result_tracker.register_result(node, resp.map(|_| ()));

			if result_tracker.all_quorums_ok() {
				// Success

				// Continue all other requests in background
				tokio::spawn(async move {
					resps.collect::<Vec<(Uuid, Result<_, _>)>>().await;
				});

				return Ok(());
			}

			if result_tracker.too_many_failures() {
				// Too many errors in this set, we know we won't get a quorum
				break;
			}
		}

		// Failure, could not get quorum within at least one set
		Err(result_tracker.quorum_error())
	}

	pub async fn get(
		self: &Arc<Self>,
		partition_key: &F::P,
		sort_key: &F::S,
	) -> Result<Option<F::E>, Error> {
		let tracer = opentelemetry::global::tracer("garage_table");
		let span = tracer.start(format!("{} get", F::TABLE_NAME));

		let res = self
			.get_internal(partition_key, sort_key)
			.bound_record_duration(&self.data.metrics.get_request_duration)
			.with_context(Context::current_with_span(span))
			.await?;

		self.data.metrics.get_request_counter.add(1);

		Ok(res)
	}

	async fn get_internal(
		self: &Arc<Self>,
		partition_key: &F::P,
		sort_key: &F::S,
	) -> Result<Option<F::E>, Error> {
		let hash = partition_key.hash();
		let who = self.data.replication.read_nodes(&hash);

		let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
		let resps = self
			.system
			.rpc_helper()
			.try_call_many(
				&self.endpoint,
				&who,
				rpc,
				RequestStrategy::with_priority(PRIO_NORMAL)
					.with_quorum(self.data.replication.read_quorum()),
			)
			.await?;

		let mut ret = None;
		let mut not_all_same = false;
		for resp in resps {
			if let TableRpc::ReadEntryResponse(value) = resp {
				if let Some(v_bytes) = value {
					let v = self.data.decode_entry(v_bytes.as_slice())?;
					ret = match ret {
						None => Some(v),
						Some(mut x) => {
							if x != v {
								not_all_same = true;
								x.merge(&v);
							}
							Some(x)
						}
					}
				}
			} else {
				return Err(Error::Message("Invalid return value to read".to_string()));
			}
		}
		if let Some(ret_entry) = &ret {
			if not_all_same {
				let self2 = self.clone();
				let ent2 = ret_entry.clone();
				tokio::spawn(async move {
					if let Err(e) = self2.repair_on_read(&who[..], ent2).await {
						warn!("Error doing repair on read: {}", e);
					}
				});
			}
		}

		Ok(ret)
	}

	pub async fn get_range(
		self: &Arc<Self>,
		partition_key: &F::P,
		begin_sort_key: Option<F::S>,
		filter: Option<F::Filter>,
		limit: usize,
		enumeration_order: EnumerationOrder,
	) -> Result<Vec<F::E>, Error> {
		let tracer = opentelemetry::global::tracer("garage_table");
		let span = tracer.start(format!("{} get_range", F::TABLE_NAME));

		let res = self
			.get_range_internal(
				partition_key,
				begin_sort_key,
				filter,
				limit,
				enumeration_order,
			)
			.bound_record_duration(&self.data.metrics.get_request_duration)
			.with_context(Context::current_with_span(span))
			.await?;

		self.data.metrics.get_request_counter.add(1);

		Ok(res)
	}

	async fn get_range_internal(
		self: &Arc<Self>,
		partition_key: &F::P,
		begin_sort_key: Option<F::S>,
		filter: Option<F::Filter>,
		limit: usize,
		enumeration_order: EnumerationOrder,
	) -> Result<Vec<F::E>, Error> {
		let hash = partition_key.hash();
		let who = self.data.replication.read_nodes(&hash);

		let rpc = TableRpc::<F>::ReadRange {
			partition: partition_key.clone(),
			begin_sort_key,
			filter,
			limit,
			enumeration_order,
		};

		let resps = self
			.system
			.rpc_helper()
			.try_call_many(
				&self.endpoint,
				&who,
				rpc,
				RequestStrategy::with_priority(PRIO_NORMAL)
					.with_quorum(self.data.replication.read_quorum()),
			)
			.await?;

		let mut ret: BTreeMap<Vec<u8>, F::E> = BTreeMap::new();
		let mut to_repair = BTreeSet::new();
		for resp in resps {
			if let TableRpc::Update(entries) = resp {
				for entry_bytes in entries.iter() {
					let entry = self.data.decode_entry(entry_bytes.as_slice())?;
					let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key());
					match ret.get_mut(&entry_key) {
						Some(e) => {
							if *e != entry {
								e.merge(&entry);
								to_repair.insert(entry_key.clone());
							}
						}
						None => {
							ret.insert(entry_key, entry);
						}
					}
				}
			} else {
				return Err(Error::unexpected_rpc_message(resp));
			}
		}

		if !to_repair.is_empty() {
			let self2 = self.clone();
			let to_repair = to_repair
				.into_iter()
				.map(|k| ret.get(&k).unwrap().clone())
				.collect::<Vec<_>>();
			tokio::spawn(async move {
				for v in to_repair {
					if let Err(e) = self2.repair_on_read(&who[..], v).await {
						warn!("Error doing repair on read: {}", e);
					}
				}
			});
		}

		// At this point, the `ret` btreemap might contain more than `limit`
		// items, because nodes might have returned us each `limit` items
		// but for different keys. We have to take only the first `limit` items
		// in this map, in the specified enumeration order, for two reasons:
		// 1. To return to the user no more than the number of items that they requested
		// 2. To return only items for which we have a read quorum: we do not know
		//    that we have a read quorum for the items after the first `limit`
		//    of them
		let ret_vec = match enumeration_order {
			EnumerationOrder::Forward => ret
				.into_iter()
				.take(limit)
				.map(|(_k, v)| v)
				.collect::<Vec<_>>(),
			EnumerationOrder::Reverse => ret
				.into_iter()
				.rev()
				.take(limit)
				.map(|(_k, v)| v)
				.collect::<Vec<_>>(),
		};
		Ok(ret_vec)
	}

	// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============

	async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
		let what_enc = Arc::new(ByteBuf::from(what.encode()?));
		self.system
			.rpc_helper()
			.try_call_many(
				&self.endpoint,
				who,
				TableRpc::<F>::Update(vec![what_enc]),
				RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(who.len()),
			)
			.await?;
		Ok(())
	}
}

#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
	async fn handle(
		self: &Arc<Self>,
		msg: &TableRpc<F>,
		_from: NodeID,
	) -> Result<TableRpc<F>, Error> {
		match msg {
			TableRpc::ReadEntry(key, sort_key) => {
				let value = self.data.read_entry(key, sort_key)?;
				Ok(TableRpc::ReadEntryResponse(value))
			}
			TableRpc::ReadRange {
				partition,
				begin_sort_key,
				filter,
				limit,
				enumeration_order,
			} => {
				let values = self.data.read_range(
					partition,
					begin_sort_key,
					filter,
					*limit,
					*enumeration_order,
				)?;
				Ok(TableRpc::Update(values))
			}
			TableRpc::Update(pairs) => {
				self.data.update_many(pairs)?;
				Ok(TableRpc::Ok)
			}
			m => Err(Error::unexpected_rpc_message(m)),
		}
	}
}