use core::ops::Bound;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::watch;

use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;

use crate::manager::*;

const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days

pub struct RepairWorker {
	manager: Arc<BlockManager>,
	next_start: Option<Hash>,
	block_iter: Option<BlockStoreIterator>,
}

impl RepairWorker {
	pub fn new(manager: Arc<BlockManager>) -> Self {
		Self {
			manager,
			next_start: None,
			block_iter: None,
		}
	}
}

#[async_trait]
impl Worker for RepairWorker {
	fn name(&self) -> String {
		"Block repair worker".into()
	}

	fn info(&self) -> Option<String> {
		match self.block_iter.as_ref() {
			None => {
				let idx_bytes = self
					.next_start
					.as_ref()
					.map(|x| x.as_slice())
					.unwrap_or(&[]);
				let idx_bytes = if idx_bytes.len() > 4 {
					&idx_bytes[..4]
				} else {
					idx_bytes
				};
				Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
			}
			Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
		}
	}

	async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
		match self.block_iter.as_mut() {
			None => {
				// Phase 1: Repair blocks from RC table.

				// We have to do this complicated two-step process where we first read a bunch
				// of hashes from the RC table, and then insert them in the to-resync queue,
				// because of SQLite. Basically, as long as we have an iterator on a DB table,
				// we can't do anything else on the DB. The naive approach (which we had previously)
				// of just iterating on the RC table and inserting items one to one in the resync
				// queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
				// This is mostly because the Rust bindings for SQLite assume a worst-case scenario
				// where SQLite is not compiled in thread-safe mode, so we have to wrap everything
				// in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
				// TODO: maybe do this with tokio::task::spawn_blocking ?
				let mut batch_of_hashes = vec![];
				let start_bound = match self.next_start.as_ref() {
					None => Bound::Unbounded,
					Some(x) => Bound::Excluded(x.as_slice()),
				};
				for entry in self
					.manager
					.rc
					.rc
					.range::<&[u8], _>((start_bound, Bound::Unbounded))?
				{
					let (hash, _) = entry?;
					let hash = Hash::try_from(&hash[..]).unwrap();
					batch_of_hashes.push(hash);
					if batch_of_hashes.len() >= 1000 {
						break;
					}
				}
				if batch_of_hashes.is_empty() {
					// move on to phase 2
					self.block_iter = Some(BlockStoreIterator::new(&self.manager));
					return Ok(WorkerState::Busy);
				}

				for hash in batch_of_hashes.into_iter() {
					self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
					self.next_start = Some(hash)
				}

				Ok(WorkerState::Busy)
			}
			Some(bi) => {
				// Phase 2: Repair blocks actually on disk
				// Lists all blocks on disk and adds them to the resync queue.
				// This allows us to find blocks we are storing but don't actually need,
				// so that we can offload them if necessary and then delete them locally.
				if let Some(hash) = bi.next().await? {
					self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
					Ok(WorkerState::Busy)
				} else {
					Ok(WorkerState::Done)
				}
			}
		}
	}

	async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
		unreachable!()
	}
}

// ----

pub struct ScrubWorker {
	manager: Arc<BlockManager>,
	rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,

	work: ScrubWorkerState,
	tranquilizer: Tranquilizer,

	persister: Persister<ScrubWorkerPersisted>,
	persisted: ScrubWorkerPersisted,
}

#[derive(Serialize, Deserialize)]
struct ScrubWorkerPersisted {
	tranquility: u32,
	time_last_complete_scrub: u64,
	corruptions_detected: u64,
}

enum ScrubWorkerState {
	Running(BlockStoreIterator),
	Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
	Finished,
}

impl Default for ScrubWorkerState {
	fn default() -> Self {
		ScrubWorkerState::Finished
	}
}

#[derive(Debug)]
pub enum ScrubWorkerCommand {
	Start,
	Pause(Duration),
	Resume,
	Cancel,
	SetTranquility(u32),
}

impl ScrubWorker {
	pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
		let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
		let persisted = match persister.load() {
			Ok(v) => v,
			Err(_) => ScrubWorkerPersisted {
				time_last_complete_scrub: 0,
				tranquility: 4,
				corruptions_detected: 0,
			},
		};
		Self {
			manager,
			rx_cmd,
			work: ScrubWorkerState::Finished,
			tranquilizer: Tranquilizer::new(30),
			persister,
			persisted,
		}
	}

	async fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) {
		match cmd {
			ScrubWorkerCommand::Start => {
				self.work = match std::mem::take(&mut self.work) {
					ScrubWorkerState::Finished => {
						let iterator = BlockStoreIterator::new(&self.manager);
						ScrubWorkerState::Running(iterator)
					}
					work => {
						error!("Cannot start scrub worker: already running!");
						work
					}
				};
			}
			ScrubWorkerCommand::Pause(dur) => {
				self.work = match std::mem::take(&mut self.work) {
					ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
						ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
					}
					work => {
						error!("Cannot pause scrub worker: not running!");
						work
					}
				};
			}
			ScrubWorkerCommand::Resume => {
				self.work = match std::mem::take(&mut self.work) {
					ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
					work => {
						error!("Cannot resume scrub worker: not paused!");
						work
					}
				};
			}
			ScrubWorkerCommand::Cancel => {
				self.work = match std::mem::take(&mut self.work) {
					ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
						ScrubWorkerState::Finished
					}
					work => {
						error!("Cannot cancel scrub worker: not running!");
						work
					}
				}
			}
			ScrubWorkerCommand::SetTranquility(t) => {
				self.persisted.tranquility = t;
				if let Err(e) = self.persister.save_async(&self.persisted).await {
					error!("Could not save new tranquilitiy value: {}", e);
				}
			}
		}
	}
}

#[async_trait]
impl Worker for ScrubWorker {
	fn name(&self) -> String {
		"Block scrub worker".into()
	}

	fn info(&self) -> Option<String> {
		let s = match &self.work {
			ScrubWorkerState::Running(bsi) => format!(
				"{:.2}% done (tranquility = {})",
				bsi.progress() * 100.,
				self.persisted.tranquility
			),
			ScrubWorkerState::Paused(bsi, rt) => {
				format!(
					"Paused, {:.2}% done, resumes at {}",
					bsi.progress() * 100.,
					msec_to_rfc3339(*rt)
				)
			}
			ScrubWorkerState::Finished => format!(
				"Last completed scrub: {}",
				msec_to_rfc3339(self.persisted.time_last_complete_scrub)
			),
		};
		Some(format!(
			"{} ; corruptions detected: {}",
			s, self.persisted.corruptions_detected
		))
	}

	async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
		match self.rx_cmd.try_recv() {
			Ok(cmd) => self.handle_cmd(cmd).await,
			Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerState::Done),
			Err(mpsc::error::TryRecvError::Empty) => (),
		};

		match &mut self.work {
			ScrubWorkerState::Running(bsi) => {
				self.tranquilizer.reset();
				if let Some(hash) = bsi.next().await? {
					match self.manager.read_block(&hash).await {
						Err(Error::CorruptData(_)) => {
							error!("Found corrupt data block during scrub: {:?}", hash);
							self.persisted.corruptions_detected += 1;
							self.persister.save_async(&self.persisted).await?;
						}
						Err(e) => return Err(e),
						_ => (),
					};
					Ok(self
						.tranquilizer
						.tranquilize_worker(self.persisted.tranquility))
				} else {
					self.persisted.time_last_complete_scrub = now_msec();
					self.persister.save_async(&self.persisted).await?;
					self.work = ScrubWorkerState::Finished;
					self.tranquilizer.clear();
					Ok(WorkerState::Idle)
				}
			}
			_ => Ok(WorkerState::Idle),
		}
	}

	async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
		let (wait_until, command) = match &self.work {
			ScrubWorkerState::Running(_) => return WorkerState::Busy,
			ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
			ScrubWorkerState::Finished => (
				self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
				ScrubWorkerCommand::Start,
			),
		};

		let now = now_msec();
		if now >= wait_until {
			self.handle_cmd(command).await;
			return WorkerState::Busy;
		}
		let delay = Duration::from_millis(wait_until - now);
		select! {
			_ = tokio::time::sleep(delay) => self.handle_cmd(command).await,
			cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
				self.handle_cmd(cmd).await;
			} else {
				return WorkerState::Done;
			}
		}

		match &self.work {
			ScrubWorkerState::Running(_) => WorkerState::Busy,
			_ => WorkerState::Idle,
		}
	}
}

// ----

struct BlockStoreIterator {
	path: Vec<ReadingDir>,
}

enum ReadingDir {
	Pending(PathBuf),
	Read {
		subpaths: Vec<fs::DirEntry>,
		pos: usize,
	},
}

impl BlockStoreIterator {
	fn new(manager: &BlockManager) -> Self {
		let root_dir = manager.data_dir.clone();
		Self {
			path: vec![ReadingDir::Pending(root_dir)],
		}
	}

	/// Returns progress done, between 0 and 1
	fn progress(&self) -> f32 {
		if self.path.is_empty() {
			1.0
		} else {
			let mut ret = 0.0;
			let mut next_div = 1;
			for p in self.path.iter() {
				match p {
					ReadingDir::Pending(_) => break,
					ReadingDir::Read { subpaths, pos } => {
						next_div *= subpaths.len();
						ret += ((*pos - 1) as f32) / (next_div as f32);
					}
				}
			}
			ret
		}
	}

	async fn next(&mut self) -> Result<Option<Hash>, Error> {
		loop {
			let last_path = match self.path.last_mut() {
				None => return Ok(None),
				Some(lp) => lp,
			};

			if let ReadingDir::Pending(path) = last_path {
				let mut reader = fs::read_dir(&path).await?;
				let mut subpaths = vec![];
				while let Some(ent) = reader.next_entry().await? {
					subpaths.push(ent);
				}
				*last_path = ReadingDir::Read { subpaths, pos: 0 };
			}

			let (subpaths, pos) = match *last_path {
				ReadingDir::Read {
					ref subpaths,
					ref mut pos,
				} => (subpaths, pos),
				ReadingDir::Pending(_) => unreachable!(),
			};

			let data_dir_ent = match subpaths.get(*pos) {
				None => {
					self.path.pop();
					continue;
				}
				Some(ent) => {
					*pos += 1;
					ent
				}
			};

			let name = data_dir_ent.file_name();
			let name = if let Ok(n) = name.into_string() {
				n
			} else {
				continue;
			};
			let ent_type = data_dir_ent.file_type().await?;

			let name = name.strip_suffix(".zst").unwrap_or(&name);
			if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
				let path = data_dir_ent.path();
				self.path.push(ReadingDir::Pending(path));
			} else if name.len() == 64 {
				if let Ok(h) = hex::decode(&name) {
					let mut hash = [0u8; 32];
					hash.copy_from_slice(&h);
					return Ok(Some(hash.into()));
				}
			}
		}
	}
}