use garage_util::data::*;

use garage_table::*;

use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;

use crate::cli::*;

use super::*;

impl AdminRpcHandler {
	pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
		match cmd {
			BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
				self.garage.block_manager.list_resync_errors()?,
			)),
			BlockOperation::Info { hash } => self.handle_block_info(hash).await,
			BlockOperation::RetryNow { all, blocks } => {
				self.handle_block_retry_now(*all, blocks).await
			}
			BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
		}
	}

	async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> {
		let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
		let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
		let refcount = self.garage.block_manager.get_block_rc(&hash)?;
		let block_refs = self
			.garage
			.block_ref_table
			.get_range(&hash, None, None, 10000, Default::default())
			.await?;
		let mut versions = vec![];
		for br in block_refs {
			if let Some(v) = self
				.garage
				.version_table
				.get(&br.version, &EmptyKey)
				.await?
			{
				versions.push(Ok(v));
			} else {
				versions.push(Err(br.version));
			}
		}
		Ok(AdminRpc::BlockInfo {
			hash,
			refcount,
			versions,
		})
	}

	async fn handle_block_retry_now(
		&self,
		all: bool,
		blocks: &[String],
	) -> Result<AdminRpc, Error> {
		if all {
			if !blocks.is_empty() {
				return Err(Error::BadRequest(
					"--all was specified, cannot also specify blocks".into(),
				));
			}
			let blocks = self.garage.block_manager.list_resync_errors()?;
			for b in blocks.iter() {
				self.garage.block_manager.resync.clear_backoff(&b.hash)?;
			}
			Ok(AdminRpc::Ok(format!(
				"{} blocks returned in queue for a retry now (check logs to see results)",
				blocks.len()
			)))
		} else {
			for hash in blocks {
				let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
				let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
				self.garage.block_manager.resync.clear_backoff(&hash)?;
			}
			Ok(AdminRpc::Ok(format!(
				"{} blocks returned in queue for a retry now (check logs to see results)",
				blocks.len()
			)))
		}
	}

	async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> {
		if !yes {
			return Err(Error::BadRequest(
				"Pass the --yes flag to confirm block purge operation.".into(),
			));
		}

		let mut obj_dels = 0;
		let mut ver_dels = 0;

		for hash in blocks {
			let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
			let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
			let block_refs = self
				.garage
				.block_ref_table
				.get_range(&hash, None, None, 10000, Default::default())
				.await?;

			for br in block_refs {
				let version = match self
					.garage
					.version_table
					.get(&br.version, &EmptyKey)
					.await?
				{
					Some(v) => v,
					None => continue,
				};

				if let Some(object) = self
					.garage
					.object_table
					.get(&version.bucket_id, &version.key)
					.await?
				{
					let ov = object.versions().iter().rev().find(|v| v.is_complete());
					if let Some(ov) = ov {
						if ov.uuid == br.version {
							let del_uuid = gen_uuid();
							let deleted_object = Object::new(
								version.bucket_id,
								version.key.clone(),
								vec![ObjectVersion {
									uuid: del_uuid,
									timestamp: ov.timestamp + 1,
									state: ObjectVersionState::Complete(
										ObjectVersionData::DeleteMarker,
									),
								}],
							);
							self.garage.object_table.insert(&deleted_object).await?;
							obj_dels += 1;
						}
					}
				}

				if !version.deleted.get() {
					let deleted_version =
						Version::new(version.uuid, version.bucket_id, version.key.clone(), true);
					self.garage.version_table.insert(&deleted_version).await?;
					ver_dels += 1;
				}
			}
		}
		Ok(AdminRpc::Ok(format!(
			"{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
			blocks.len(),
			obj_dels,
			ver_dels
		)))
	}
}