use base64::prelude::*;
use hyper::{Method, StatusCode};
use std::time::Duration;

use assert_json_diff::assert_json_eq;
use serde_json::json;

use super::json_body;
use crate::common;

#[tokio::test]
async fn test_poll_item() {
	let ctx = common::context();
	let bucket = ctx.create_bucket("test-k2v-poll-item");

	// Write initial value
	let res = ctx
		.k2v
		.request
		.builder(bucket.clone())
		.method(Method::PUT)
		.path("root")
		.query_param("sort_key", Some("test1"))
		.body(b"Initial value".to_vec())
		.send()
		.await
		.unwrap();
	assert_eq!(res.status(), StatusCode::NO_CONTENT);

	// Retrieve initial value to get its causality token
	let res2 = ctx
		.k2v
		.request
		.builder(bucket.clone())
		.path("root")
		.query_param("sort_key", Some("test1"))
		.signed_header("accept", "application/octet-stream")
		.send()
		.await
		.unwrap();
	assert_eq!(res2.status(), StatusCode::OK);
	let ct = res2
		.headers()
		.get("x-garage-causality-token")
		.unwrap()
		.to_str()
		.unwrap()
		.to_string();

	let res2_body = hyper::body::to_bytes(res2.into_body())
		.await
		.unwrap()
		.to_vec();
	assert_eq!(res2_body, b"Initial value");

	// Start poll operation
	let poll = {
		let bucket = bucket.clone();
		let ct = ct.clone();
		tokio::spawn(async move {
			let ctx = common::context();
			ctx.k2v
				.request
				.builder(bucket.clone())
				.path("root")
				.query_param("sort_key", Some("test1"))
				.query_param("causality_token", Some(ct))
				.query_param("timeout", Some("10"))
				.signed_header("accept", "application/octet-stream")
				.send()
				.await
		})
	};

	// Write new value that supersedes initial one
	let res = ctx
		.k2v
		.request
		.builder(bucket.clone())
		.method(Method::PUT)
		.path("root")
		.query_param("sort_key", Some("test1"))
		.signed_header("x-garage-causality-token", ct)
		.body(b"New value".to_vec())
		.send()
		.await
		.unwrap();
	assert_eq!(res.status(), StatusCode::NO_CONTENT);

	// Check poll finishes with correct value
	let poll_res = tokio::select! {
		_ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
		res = poll => res.unwrap().unwrap(),
	};

	assert_eq!(poll_res.status(), StatusCode::OK);

	let poll_res_body = hyper::body::to_bytes(poll_res.into_body())
		.await
		.unwrap()
		.to_vec();
	assert_eq!(poll_res_body, b"New value");
}

#[tokio::test]
async fn test_poll_range() {
	let ctx = common::context();
	let bucket = ctx.create_bucket("test-k2v-poll-range");

	// Write initial value
	let res = ctx
		.k2v
		.request
		.builder(bucket.clone())
		.method(Method::PUT)
		.path("root")
		.query_param("sort_key", Some("test1"))
		.body(b"Initial value".to_vec())
		.send()
		.await
		.unwrap();
	assert_eq!(res.status(), StatusCode::NO_CONTENT);

	// Retrieve initial value to get its causality token
	let res2 = ctx
		.k2v
		.request
		.builder(bucket.clone())
		.path("root")
		.query_param("sort_key", Some("test1"))
		.signed_header("accept", "application/octet-stream")
		.send()
		.await
		.unwrap();
	assert_eq!(res2.status(), StatusCode::OK);
	let ct = res2
		.headers()
		.get("x-garage-causality-token")
		.unwrap()
		.to_str()
		.unwrap()
		.to_string();

	// Initial poll range, retrieve single item and first seen_marker
	let res2 = ctx
		.k2v
		.request
		.builder(bucket.clone())
		.method(Method::POST)
		.path("root")
		.query_param("poll_range", None::<String>)
		.body(b"{}".to_vec())
		.send()
		.await
		.unwrap();
	assert_eq!(res2.status(), StatusCode::OK);
	let json_res = json_body(res2).await;
	let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
	assert_json_eq!(
		json_res,
		json!(
			{
				"items": [
				  {"sk": "test1", "ct": ct, "v": [BASE64_STANDARD.encode(b"Initial value")]},
				],
				"seenMarker": seen_marker,
			}
		)
	);

	// Second poll range, which will complete later
	let poll = {
		let bucket = bucket.clone();
		tokio::spawn(async move {
			let ctx = common::context();
			ctx.k2v
				.request
				.builder(bucket.clone())
				.method(Method::POST)
				.path("root")
				.query_param("poll_range", None::<String>)
				.body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
				.send()
				.await
		})
	};

	// Write new value that supersedes initial one
	let res = ctx
		.k2v
		.request
		.builder(bucket.clone())
		.method(Method::PUT)
		.path("root")
		.query_param("sort_key", Some("test1"))
		.signed_header("x-garage-causality-token", ct)
		.body(b"New value".to_vec())
		.send()
		.await
		.unwrap();
	assert_eq!(res.status(), StatusCode::NO_CONTENT);

	// Check poll finishes with correct value
	let poll_res = tokio::select! {
		_ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
		res = poll => res.unwrap().unwrap(),
	};

	assert_eq!(poll_res.status(), StatusCode::OK);
	let json_res = json_body(poll_res).await;
	let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
	assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
	assert_json_eq!(&json_res["items"][0]["sk"], json!("test1"));
	assert_json_eq!(
		&json_res["items"][0]["v"],
		json!([BASE64_STANDARD.encode(b"New value")])
	);

	// Now we will add a value on a different key
	// Start a new poll operation
	let poll = {
		let bucket = bucket.clone();
		tokio::spawn(async move {
			let ctx = common::context();
			ctx.k2v
				.request
				.builder(bucket.clone())
				.method(Method::POST)
				.path("root")
				.query_param("poll_range", None::<String>)
				.body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
				.send()
				.await
		})
	};

	// Write value on different key
	let res = ctx
		.k2v
		.request
		.builder(bucket.clone())
		.method(Method::PUT)
		.path("root")
		.query_param("sort_key", Some("test2"))
		.body(b"Other value".to_vec())
		.send()
		.await
		.unwrap();
	assert_eq!(res.status(), StatusCode::NO_CONTENT);

	// Check poll finishes with correct value
	let poll_res = tokio::select! {
		_ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
		res = poll => res.unwrap().unwrap(),
	};

	assert_eq!(poll_res.status(), StatusCode::OK);
	let json_res = json_body(poll_res).await;
	assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
	assert_json_eq!(&json_res["items"][0]["sk"], json!("test2"));
	assert_json_eq!(
		&json_res["items"][0]["v"],
		json!([BASE64_STANDARD.encode(b"Other value")])
	);
}