diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-27 14:58:28 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-27 14:58:28 +0100 |
commit | 7ac24ad913fa081e1bd6f5b042b9da0173dad267 (patch) | |
tree | e8cb9d877ea0df530f6699d7d96dced66ff079d6 /src/storage/garage.rs | |
parent | 54c9736a247bb3534a285caa637c9afb052bc2dd (diff) | |
download | aerogramme-7ac24ad913fa081e1bd6f5b042b9da0173dad267.tar.gz aerogramme-7ac24ad913fa081e1bd6f5b042b9da0173dad267.zip |
cargo format
Diffstat (limited to 'src/storage/garage.rs')
-rw-r--r-- | src/storage/garage.rs | 252 |
1 files changed, 159 insertions, 93 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs index f9ba756..d08585f 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -1,10 +1,6 @@ use crate::storage::*; +use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError}; use serde::Serialize; -use aws_sdk_s3::{ - self as s3, - error::SdkError, - operation::get_object::GetObjectError, -}; #[derive(Clone, Debug, Serialize)] pub struct GarageConf { @@ -28,18 +24,18 @@ impl GarageBuilder { unicity.extend_from_slice(file!().as_bytes()); unicity.append(&mut rmp_serde::to_vec(&conf)?); Ok(Arc::new(Self { conf, unicity })) - } + } } #[async_trait] impl IBuilder for GarageBuilder { async fn build(&self) -> Result<Store, StorageError> { let s3_creds = s3::config::Credentials::new( - self.conf.aws_access_key_id.clone(), - self.conf.aws_secret_access_key.clone(), - None, - None, - "aerogramme" + self.conf.aws_access_key_id.clone(), + self.conf.aws_secret_access_key.clone(), + None, + None, + "aerogramme", ); let s3_config = aws_config::from_env() @@ -51,12 +47,12 @@ impl IBuilder for GarageBuilder { let s3_client = aws_sdk_s3::Client::new(&s3_config); let k2v_config = k2v_client::K2vClientConfig { - endpoint: self.conf.k2v_endpoint.clone(), - region: self.conf.region.clone(), - aws_access_key_id: self.conf.aws_access_key_id.clone(), - aws_secret_access_key: self.conf.aws_secret_access_key.clone(), - bucket: self.conf.bucket.clone(), - user_agent: None, + endpoint: self.conf.k2v_endpoint.clone(), + region: self.conf.region.clone(), + aws_access_key_id: self.conf.aws_access_key_id.clone(), + aws_secret_access_key: self.conf.aws_secret_access_key.clone(), + bucket: self.conf.bucket.clone(), + user_agent: None, }; let k2v_client = match k2v_client::K2vClient::new(k2v_config) { @@ -67,7 +63,7 @@ impl IBuilder for GarageBuilder { Ok(v) => v, }; - Ok(Box::new(GarageStore { + Ok(Box::new(GarageStore { bucket: self.conf.bucket.clone(), s3: s3_client, k2v: k2v_client, @@ -86,19 +82,30 @@ pub struct GarageStore { fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal { let new_row_ref = row_ref.with_causality(causal_value.causality.into()); - let row_values = causal_value.value.into_iter().map(|k2v_value| match k2v_value { - k2v_client::K2vValue::Tombstone => Alternative::Tombstone, - k2v_client::K2vValue::Value(v) => Alternative::Value(v), - }).collect::<Vec<_>>(); + let row_values = causal_value + .value + .into_iter() + .map(|k2v_value| match k2v_value { + k2v_client::K2vValue::Tombstone => Alternative::Tombstone, + k2v_client::K2vValue::Value(v) => Alternative::Value(v), + }) + .collect::<Vec<_>>(); - RowVal { row_ref: new_row_ref, value: row_values } + RowVal { + row_ref: new_row_ref, + value: row_values, + } } #[async_trait] impl IStore for GarageStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { let (pk_list, batch_op) = match select { - Selector::Range { shard, sort_begin, sort_end } => ( + Selector::Range { + shard, + sort_begin, + sort_end, + } => ( vec![shard.to_string()], vec![k2v_client::BatchReadOp { partition_key: shard, @@ -108,49 +115,71 @@ impl IStore for GarageStore { ..k2v_client::Filter::default() }, ..k2v_client::BatchReadOp::default() - }] + }], ), Selector::List(row_ref_list) => ( - row_ref_list.iter().map(|row_ref| row_ref.uid.shard.to_string()).collect::<Vec<_>>(), - row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp { - partition_key: &row_ref.uid.shard, + row_ref_list + .iter() + .map(|row_ref| row_ref.uid.shard.to_string()) + .collect::<Vec<_>>(), + row_ref_list + .iter() + .map(|row_ref| k2v_client::BatchReadOp { + partition_key: &row_ref.uid.shard, + filter: k2v_client::Filter { + start: Some(&row_ref.uid.sort), + ..k2v_client::Filter::default() + }, + single_item: true, + ..k2v_client::BatchReadOp::default() + }) + .collect::<Vec<_>>(), + ), + Selector::Prefix { shard, sort_prefix } => ( + vec![shard.to_string()], + vec![k2v_client::BatchReadOp { + partition_key: shard, filter: k2v_client::Filter { - start: Some(&row_ref.uid.sort), + prefix: Some(sort_prefix), ..k2v_client::Filter::default() }, - single_item: true, ..k2v_client::BatchReadOp::default() - }).collect::<Vec<_>>() + }], ), - Selector::Prefix { shard, sort_prefix } => ( - vec![shard.to_string()], - vec![k2v_client::BatchReadOp { - partition_key: shard, - filter: k2v_client::Filter { - prefix: Some(sort_prefix), - ..k2v_client::Filter::default() - }, - ..k2v_client::BatchReadOp::default() - }]), Selector::Single(row_ref) => { - let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await { + let causal_value = match self + .k2v + .read_item(&row_ref.uid.shard, &row_ref.uid.sort) + .await + { Err(e) => { - tracing::error!("K2V read item shard={}, sort={}, bucket={} failed: {}", row_ref.uid.shard, row_ref.uid.sort, self.bucket, e); + tracing::error!( + "K2V read item shard={}, sort={}, bucket={} failed: {}", + row_ref.uid.shard, + row_ref.uid.sort, + self.bucket, + e + ); return Err(StorageError::Internal); - }, + } Ok(v) => v, }; let row_val = causal_to_row_val((*row_ref).clone(), causal_value); - return Ok(vec![row_val]) - }, + return Ok(vec![row_val]); + } }; let all_raw_res = match self.k2v.read_batch(&batch_op).await { Err(e) => { - tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e); + tracing::error!( + "k2v read batch failed for {:?}, bucket {} with err: {}", + select, + self.bucket, + e + ); return Err(StorageError::Internal); - }, + } Ok(v) => v, }; @@ -163,13 +192,17 @@ impl IStore for GarageStore { .into_iter() .zip(pk_list.into_iter()) .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) - .collect::<Vec<_>>(); + .collect::<Vec<_>>(); Ok(row_vals) } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { let del_op = match select { - Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp { + Selector::Range { + shard, + sort_begin, + sort_end, + } => vec![k2v_client::BatchDeleteOp { partition_key: shard, prefix: None, start: Some(sort_begin), @@ -178,21 +211,24 @@ impl IStore for GarageStore { }], Selector::List(row_ref_list) => { // Insert null values with causality token = delete - let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp { - partition_key: &v.uid.shard, - sort_key: &v.uid.sort, - causality: v.causality.clone().map(|ct| ct.into()), - value: k2v_client::K2vValue::Tombstone, - }).collect::<Vec<_>>(); - + let batch_op = row_ref_list + .iter() + .map(|v| k2v_client::BatchInsertOp { + partition_key: &v.uid.shard, + sort_key: &v.uid.sort, + causality: v.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }) + .collect::<Vec<_>>(); + return match self.k2v.insert_batch(&batch_op).await { Err(e) => { tracing::error!("Unable to delete the list of values: {}", e); Err(StorageError::Internal) - }, + } Ok(_) => Ok(()), }; - }, + } Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp { partition_key: shard, prefix: Some(sort_prefix), @@ -208,15 +244,15 @@ impl IStore for GarageStore { causality: row_ref.causality.clone().map(|ct| ct.into()), value: k2v_client::K2vValue::Tombstone, }]; - + return match self.k2v.insert_batch(&batch_op).await { Err(e) => { tracing::error!("Unable to delete the list of values: {}", e); Err(StorageError::Internal) - }, + } Ok(_) => Ok(()), }; - }, + } }; // Finally here we only have prefix & range @@ -224,34 +260,46 @@ impl IStore for GarageStore { Err(e) => { tracing::error!("delete batch error: {}", e); Err(StorageError::Internal) - }, + } Ok(_) => Ok(()), } } async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { - let batch_ops = values.iter().map(|v| k2v_client::BatchInsertOp { - partition_key: &v.row_ref.uid.shard, - sort_key: &v.row_ref.uid.sort, - causality: v.row_ref.causality.clone().map(|ct| ct.into()), - value: v.value.iter().next().map(|cv| match cv { - Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), - Alternative::Tombstone => k2v_client::K2vValue::Tombstone, - }).unwrap_or(k2v_client::K2vValue::Tombstone) - }).collect::<Vec<_>>(); + let batch_ops = values + .iter() + .map(|v| k2v_client::BatchInsertOp { + partition_key: &v.row_ref.uid.shard, + sort_key: &v.row_ref.uid.sort, + causality: v.row_ref.causality.clone().map(|ct| ct.into()), + value: v + .value + .iter() + .next() + .map(|cv| match cv { + Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), + Alternative::Tombstone => k2v_client::K2vValue::Tombstone, + }) + .unwrap_or(k2v_client::K2vValue::Tombstone), + }) + .collect::<Vec<_>>(); match self.k2v.insert_batch(&batch_ops).await { Err(e) => { tracing::error!("k2v can't insert some value: {}", e); Err(StorageError::Internal) - }, + } Ok(v) => Ok(v), } } async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { loop { if let Some(ct) = &value.causality { - match self.k2v.poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None).await { + match self + .k2v + .poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None) + .await + { Err(e) => { tracing::error!("Unable to poll item: {}", e); return Err(StorageError::Internal); @@ -262,8 +310,7 @@ impl IStore for GarageStore { } else { match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await { Err(k2v_client::Error::NotFound) => { - self - .k2v + self.k2v .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None) .await .map_err(|e| { @@ -273,8 +320,8 @@ impl IStore for GarageStore { } Err(e) => { tracing::error!("Unable to read item in polling logic: {}", e); - return Err(StorageError::Internal) - }, + return Err(StorageError::Internal); + } Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)), } } @@ -282,7 +329,8 @@ impl IStore for GarageStore { } async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { - let maybe_out = self.s3 + let maybe_out = self + .s3 .get_object() .bucket(self.bucket.to_string()) .key(blob_ref.0.to_string()) @@ -296,12 +344,12 @@ impl IStore for GarageStore { e => { tracing::warn!("Blob Fetch Error, Service Error: {}", e); return Err(StorageError::Internal); - }, + } }, Err(e) => { tracing::warn!("Blob Fetch Error, {}", e); return Err(StorageError::Internal); - }, + } }; let buffer = match object_output.body.collect().await { @@ -316,9 +364,10 @@ impl IStore for GarageStore { Ok(BlobVal::new(blob_ref.clone(), buffer)) } async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { - let streamable_value = s3::primitives::ByteStream::from(blob_val.value); + let streamable_value = s3::primitives::ByteStream::from(blob_val.value); - let maybe_send = self.s3 + let maybe_send = self + .s3 .put_object() .bucket(self.bucket.to_string()) .key(blob_val.blob_ref.0.to_string()) @@ -338,7 +387,8 @@ impl IStore for GarageStore { } } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { - let maybe_copy = self.s3 + let maybe_copy = self + .s3 .copy_object() .bucket(self.bucket.to_string()) .key(dst.0.clone()) @@ -348,18 +398,24 @@ impl IStore for GarageStore { match maybe_copy { Err(e) => { - tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.bucket, e); + tracing::error!( + "unable to copy object {} to {} (bucket: {}), error: {}", + src.0, + dst.0, + self.bucket, + e + ); Err(StorageError::Internal) - }, + } Ok(_) => { tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket); Ok(()) } } - } async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { - let maybe_list = self.s3 + let maybe_list = self + .s3 .list_objects_v2() .bucket(self.bucket.to_string()) .prefix(prefix) @@ -370,7 +426,12 @@ impl IStore for GarageStore { match maybe_list { Err(e) => { - tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.bucket, e); + tracing::error!( + "listing prefix {} on bucket {} failed: {}", + prefix, + self.bucket, + e + ); Err(StorageError::Internal) } Ok(pagin_list_out) => Ok(pagin_list_out @@ -382,7 +443,8 @@ impl IStore for GarageStore { } } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { - let maybe_delete = self.s3 + let maybe_delete = self + .s3 .delete_object() .bucket(self.bucket.to_string()) .key(blob_ref.0.clone()) @@ -391,9 +453,14 @@ impl IStore for GarageStore { match maybe_delete { Err(e) => { - tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.bucket, e); + tracing::error!( + "unable to delete {} (bucket: {}), error {}", + blob_ref.0, + self.bucket, + e + ); Err(StorageError::Internal) - }, + } Ok(_) => { tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket); Ok(()) @@ -401,4 +468,3 @@ impl IStore for GarageStore { } } } - |