use crate::storage::*;
use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError};
use serde::Serialize;

#[derive(Clone, Debug, Serialize)]
pub struct GarageConf {
    pub region: String,
    pub s3_endpoint: String,
    pub k2v_endpoint: String,
    pub aws_access_key_id: String,
    pub aws_secret_access_key: String,
    pub bucket: String,
}

#[derive(Clone, Debug)]
pub struct GarageBuilder {
    conf: GarageConf,
    unicity: Vec<u8>,
}

impl GarageBuilder {
    pub fn new(conf: GarageConf) -> anyhow::Result<Arc<Self>> {
        let mut unicity: Vec<u8> = vec![];
        unicity.extend_from_slice(file!().as_bytes());
        unicity.append(&mut rmp_serde::to_vec(&conf)?);
        Ok(Arc::new(Self { conf, unicity }))
    }
}

#[async_trait]
impl IBuilder for GarageBuilder {
    async fn build(&self) -> Result<Store, StorageError> {
        let s3_creds = s3::config::Credentials::new(
            self.conf.aws_access_key_id.clone(),
            self.conf.aws_secret_access_key.clone(),
            None,
            None,
            "aerogramme",
        );

        let sdk_config = aws_config::from_env()
            .region(aws_config::Region::new(self.conf.region.clone()))
            .credentials_provider(s3_creds)
            .endpoint_url(self.conf.s3_endpoint.clone())
            .load()
            .await;

        let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
            .force_path_style(true)
            .build();

        let s3_client = aws_sdk_s3::Client::from_conf(s3_config);

        let k2v_config = k2v_client::K2vClientConfig {
            endpoint: self.conf.k2v_endpoint.clone(),
            region: self.conf.region.clone(),
            aws_access_key_id: self.conf.aws_access_key_id.clone(),
            aws_secret_access_key: self.conf.aws_secret_access_key.clone(),
            bucket: self.conf.bucket.clone(),
            user_agent: None,
        };

        let k2v_client = match k2v_client::K2vClient::new(k2v_config) {
            Err(e) => {
                tracing::error!("unable to build k2v client: {}", e);
                return Err(StorageError::Internal);
            }
            Ok(v) => v,
        };

        Ok(Box::new(GarageStore {
            bucket: self.conf.bucket.clone(),
            s3: s3_client,
            k2v: k2v_client,
        }))
    }
    fn unique(&self) -> UnicityBuffer {
        UnicityBuffer(self.unicity.clone())
    }
}

pub struct GarageStore {
    bucket: String,
    s3: s3::Client,
    k2v: k2v_client::K2vClient,
}

fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal {
    let new_row_ref = row_ref.with_causality(causal_value.causality.into());
    let row_values = causal_value
        .value
        .into_iter()
        .map(|k2v_value| match k2v_value {
            k2v_client::K2vValue::Tombstone => Alternative::Tombstone,
            k2v_client::K2vValue::Value(v) => Alternative::Value(v),
        })
        .collect::<Vec<_>>();

    RowVal {
        row_ref: new_row_ref,
        value: row_values,
    }
}

#[async_trait]
impl IStore for GarageStore {
    async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
        let (pk_list, batch_op) = match select {
            Selector::Range {
                shard,
                sort_begin,
                sort_end,
            } => (
                vec![shard.to_string()],
                vec![k2v_client::BatchReadOp {
                    partition_key: shard,
                    filter: k2v_client::Filter {
                        start: Some(sort_begin),
                        end: Some(sort_end),
                        ..k2v_client::Filter::default()
                    },
                    ..k2v_client::BatchReadOp::default()
                }],
            ),
            Selector::List(row_ref_list) => (
                row_ref_list
                    .iter()
                    .map(|row_ref| row_ref.uid.shard.to_string())
                    .collect::<Vec<_>>(),
                row_ref_list
                    .iter()
                    .map(|row_ref| k2v_client::BatchReadOp {
                        partition_key: &row_ref.uid.shard,
                        filter: k2v_client::Filter {
                            start: Some(&row_ref.uid.sort),
                            ..k2v_client::Filter::default()
                        },
                        single_item: true,
                        ..k2v_client::BatchReadOp::default()
                    })
                    .collect::<Vec<_>>(),
            ),
            Selector::Prefix { shard, sort_prefix } => (
                vec![shard.to_string()],
                vec![k2v_client::BatchReadOp {
                    partition_key: shard,
                    filter: k2v_client::Filter {
                        prefix: Some(sort_prefix),
                        ..k2v_client::Filter::default()
                    },
                    ..k2v_client::BatchReadOp::default()
                }],
            ),
            Selector::Single(row_ref) => {
                let causal_value = match self
                    .k2v
                    .read_item(&row_ref.uid.shard, &row_ref.uid.sort)
                    .await
                {
                    Err(k2v_client::Error::NotFound) => {
                        tracing::debug!(
                            "K2V item not found  shard={}, sort={}, bucket={}",
                            row_ref.uid.shard,
                            row_ref.uid.sort,
                            self.bucket,
                        );
                        return Err(StorageError::NotFound);
                    }
                    Err(e) => {
                        tracing::error!(
                            "K2V read item shard={}, sort={}, bucket={} failed: {}",
                            row_ref.uid.shard,
                            row_ref.uid.sort,
                            self.bucket,
                            e
                        );
                        return Err(StorageError::Internal);
                    }
                    Ok(v) => v,
                };

                let row_val = causal_to_row_val((*row_ref).clone(), causal_value);
                return Ok(vec![row_val]);
            }
        };

        let all_raw_res = match self.k2v.read_batch(&batch_op).await {
            Err(e) => {
                tracing::error!(
                    "k2v read batch failed for {:?}, bucket {} with err: {}",
                    select,
                    self.bucket,
                    e
                );
                return Err(StorageError::Internal);
            }
            Ok(v) => v,
        };

        let row_vals = all_raw_res
            .into_iter()
            .fold(vec![], |mut acc, v| {
                acc.extend(v.items);
                acc
            })
            .into_iter()
            .zip(pk_list.into_iter())
            .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
            .collect::<Vec<_>>();

        Ok(row_vals)
    }
    async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
        let del_op = match select {
            Selector::Range {
                shard,
                sort_begin,
                sort_end,
            } => vec![k2v_client::BatchDeleteOp {
                partition_key: shard,
                prefix: None,
                start: Some(sort_begin),
                end: Some(sort_end),
                single_item: false,
            }],
            Selector::List(row_ref_list) => {
                // Insert null values with causality token = delete
                let batch_op = row_ref_list
                    .iter()
                    .map(|v| k2v_client::BatchInsertOp {
                        partition_key: &v.uid.shard,
                        sort_key: &v.uid.sort,
                        causality: v.causality.clone().map(|ct| ct.into()),
                        value: k2v_client::K2vValue::Tombstone,
                    })
                    .collect::<Vec<_>>();

                return match self.k2v.insert_batch(&batch_op).await {
                    Err(e) => {
                        tracing::error!("Unable to delete the list of values: {}", e);
                        Err(StorageError::Internal)
                    }
                    Ok(_) => Ok(()),
                };
            }
            Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp {
                partition_key: shard,
                prefix: Some(sort_prefix),
                start: None,
                end: None,
                single_item: false,
            }],
            Selector::Single(row_ref) => {
                // Insert null values with causality token = delete
                let batch_op = vec![k2v_client::BatchInsertOp {
                    partition_key: &row_ref.uid.shard,
                    sort_key: &row_ref.uid.sort,
                    causality: row_ref.causality.clone().map(|ct| ct.into()),
                    value: k2v_client::K2vValue::Tombstone,
                }];

                return match self.k2v.insert_batch(&batch_op).await {
                    Err(e) => {
                        tracing::error!("Unable to delete the list of values: {}", e);
                        Err(StorageError::Internal)
                    }
                    Ok(_) => Ok(()),
                };
            }
        };

        // Finally here we only have prefix & range
        match self.k2v.delete_batch(&del_op).await {
            Err(e) => {
                tracing::error!("delete batch error: {}", e);
                Err(StorageError::Internal)
            }
            Ok(_) => Ok(()),
        }
    }

    async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
        let batch_ops = values
            .iter()
            .map(|v| k2v_client::BatchInsertOp {
                partition_key: &v.row_ref.uid.shard,
                sort_key: &v.row_ref.uid.sort,
                causality: v.row_ref.causality.clone().map(|ct| ct.into()),
                value: v
                    .value
                    .iter()
                    .next()
                    .map(|cv| match cv {
                        Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()),
                        Alternative::Tombstone => k2v_client::K2vValue::Tombstone,
                    })
                    .unwrap_or(k2v_client::K2vValue::Tombstone),
            })
            .collect::<Vec<_>>();

        match self.k2v.insert_batch(&batch_ops).await {
            Err(e) => {
                tracing::error!("k2v can't insert some value: {}", e);
                Err(StorageError::Internal)
            }
            Ok(v) => Ok(v),
        }
    }
    async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
        loop {
            if let Some(ct) = &value.causality {
                match self
                    .k2v
                    .poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None)
                    .await
                {
                    Err(e) => {
                        tracing::error!("Unable to poll item: {}", e);
                        return Err(StorageError::Internal);
                    }
                    Ok(None) => continue,
                    Ok(Some(cv)) => return Ok(causal_to_row_val(value.clone(), cv)),
                }
            } else {
                match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await {
                    Err(k2v_client::Error::NotFound) => {
                        self.k2v
                            .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None)
                            .await
                            .map_err(|e| {
                                tracing::error!("Unable to insert item in polling logic: {}", e);
                                StorageError::Internal
                            })?;
                    }
                    Err(e) => {
                        tracing::error!("Unable to read item in polling logic: {}", e);
                        return Err(StorageError::Internal);
                    }
                    Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)),
                }
            }
        }
    }

    async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
        let maybe_out = self
            .s3
            .get_object()
            .bucket(self.bucket.to_string())
            .key(blob_ref.0.to_string())
            .send()
            .await;

        let object_output = match maybe_out {
            Ok(output) => output,
            Err(SdkError::ServiceError(x)) => match x.err() {
                GetObjectError::NoSuchKey(_) => return Err(StorageError::NotFound),
                e => {
                    tracing::warn!("Blob Fetch Error, Service Error: {}", e);
                    return Err(StorageError::Internal);
                }
            },
            Err(e) => {
                tracing::warn!("Blob Fetch Error, {}", e);
                return Err(StorageError::Internal);
            }
        };

        let buffer = match object_output.body.collect().await {
            Ok(aggreg) => aggreg.to_vec(),
            Err(e) => {
                tracing::warn!("Fetching body failed with {}", e);
                return Err(StorageError::Internal);
            }
        };

        let mut bv = BlobVal::new(blob_ref.clone(), buffer);
        if let Some(meta) = object_output.metadata {
            bv.meta = meta;
        }
        tracing::debug!("Fetched {}/{}", self.bucket, blob_ref.0);
        Ok(bv)
    }
    async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
        let streamable_value = s3::primitives::ByteStream::from(blob_val.value);

        let maybe_send = self
            .s3
            .put_object()
            .bucket(self.bucket.to_string())
            .key(blob_val.blob_ref.0.to_string())
            .set_metadata(Some(blob_val.meta))
            .body(streamable_value)
            .send()
            .await;

        match maybe_send {
            Err(e) => {
                tracing::error!("unable to send object: {}", e);
                Err(StorageError::Internal)
            }
            Ok(_) => {
                tracing::debug!("Inserted {}/{}", self.bucket, blob_val.blob_ref.0);
                Ok(())
            }
        }
    }
    async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
        let maybe_copy = self
            .s3
            .copy_object()
            .bucket(self.bucket.to_string())
            .key(dst.0.clone())
            .copy_source(format!("/{}/{}", self.bucket.to_string(), src.0.clone()))
            .send()
            .await;

        match maybe_copy {
            Err(e) => {
                tracing::error!(
                    "unable to copy object {} to {} (bucket: {}), error: {}",
                    src.0,
                    dst.0,
                    self.bucket,
                    e
                );
                Err(StorageError::Internal)
            }
            Ok(_) => {
                tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket);
                Ok(())
            }
        }
    }
    async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
        let maybe_list = self
            .s3
            .list_objects_v2()
            .bucket(self.bucket.to_string())
            .prefix(prefix)
            .into_paginator()
            .send()
            .try_collect()
            .await;

        match maybe_list {
            Err(e) => {
                tracing::error!(
                    "listing prefix {} on bucket {} failed: {}",
                    prefix,
                    self.bucket,
                    e
                );
                Err(StorageError::Internal)
            }
            Ok(pagin_list_out) => Ok(pagin_list_out
                .into_iter()
                .map(|list_out| list_out.contents.unwrap_or(vec![]))
                .flatten()
                .map(|obj| BlobRef(obj.key.unwrap_or(String::new())))
                .collect::<Vec<_>>()),
        }
    }
    async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
        let maybe_delete = self
            .s3
            .delete_object()
            .bucket(self.bucket.to_string())
            .key(blob_ref.0.clone())
            .send()
            .await;

        match maybe_delete {
            Err(e) => {
                tracing::error!(
                    "unable to delete {} (bucket: {}), error {}",
                    blob_ref.0,
                    self.bucket,
                    e
                );
                Err(StorageError::Internal)
            }
            Ok(_) => {
                tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket);
                Ok(())
            }
        }
    }
}