diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/storage/garage.rs | 118 |
1 files changed, 98 insertions, 20 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs index ec26e80..bc123b5 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -34,7 +34,7 @@ impl GarageBuilder { #[async_trait] impl IBuilder for GarageBuilder { async fn build(&self) -> Result<Store, StorageError> { - let creds = s3::config::Credentials::new( + let s3_creds = s3::config::Credentials::new( self.conf.aws_access_key_id.clone(), self.conf.aws_secret_access_key.clone(), None, @@ -42,17 +42,35 @@ impl IBuilder for GarageBuilder { "aerogramme" ); - let config = aws_config::from_env() + let s3_config = aws_config::from_env() .region(aws_config::Region::new(self.conf.region.clone())) - .credentials_provider(creds) + .credentials_provider(s3_creds) .endpoint_url(self.conf.s3_endpoint.clone()) .load() .await; + let s3_client = aws_sdk_s3::Client::new(&s3_config); + + let k2v_config = k2v_client::K2vClientConfig { + endpoint: self.conf.k2v_endpoint.clone(), + region: self.conf.region.clone(), + aws_access_key_id: self.conf.aws_access_key_id.clone(), + aws_secret_access_key: self.conf.aws_secret_access_key.clone(), + bucket: self.conf.bucket.clone(), + user_agent: None, + }; + + let k2v_client = match k2v_client::K2vClient::new(k2v_config) { + Err(e) => { + tracing::error!("unable to build k2v client: {}", e); + return Err(StorageError::Internal); + } + Ok(v) => v, + }; - let s3_client = aws_sdk_s3::Client::new(&config); Ok(Box::new(GarageStore { - s3_bucket: self.conf.bucket.clone(), - s3: s3_client + bucket: self.conf.bucket.clone(), + s3: s3_client, + k2v: k2v_client, })) } fn unique(&self) -> UnicityBuffer { @@ -61,13 +79,73 @@ impl IBuilder for GarageBuilder { } pub struct GarageStore { - s3_bucket: String, + bucket: String, s3: s3::Client, + k2v: k2v_client::K2vClient, +} + +fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal { + let new_row_ref = row_ref.with_causality(causal_value.causality.into()); + let row_values = causal_value.value.into_iter().map(|k2v_value| match k2v_value { + k2v_client::K2vValue::Tombstone => Alternative::Tombstone, + k2v_client::K2vValue::Value(v) => Alternative::Value(v), + }).collect::<Vec<_>>(); + + RowVal { row_ref: new_row_ref, value: row_values } } #[async_trait] impl IStore for GarageStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { + let batch_op = match select { + Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchReadOp { + partition_key: shard, + filter: k2v_client::Filter { + start: Some(sort_begin), + end: Some(sort_end), + ..k2v_client::Filter::default() + }, + ..k2v_client::BatchReadOp::default() + }], + Selector::List(row_ref_list) => row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp { + partition_key: &row_ref.uid.shard, + filter: k2v_client::Filter { + start: Some(&row_ref.uid.sort), + ..k2v_client::Filter::default() + }, + single_item: true, + ..k2v_client::BatchReadOp::default() + }).collect::<Vec<_>>(), + Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchReadOp { + partition_key: shard, + filter: k2v_client::Filter { + prefix: Some(sort_prefix), + ..k2v_client::Filter::default() + }, + ..k2v_client::BatchReadOp::default() + }], + Selector::Single(row_ref) => { + let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await { + Err(e) => { + tracing::error!("K2V read item shard={}, sort={}, bucket={} failed: {}", row_ref.uid.shard, row_ref.uid.sort, self.bucket, e); + return Err(StorageError::Internal); + }, + Ok(v) => v, + }; + + let row_val = causal_to_row_val((*row_ref).clone(), causal_value); + return Ok(vec![row_val]) + }, + }; + + let all_res = match self.k2v.read_batch(&batch_op).await { + Err(e) => { + tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e); + return Err(StorageError::Internal); + }, + Ok(v) => v, + }; + unimplemented!(); } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { @@ -89,7 +167,7 @@ impl IStore for GarageStore { async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { let maybe_out = self.s3 .get_object() - .bucket(self.s3_bucket.to_string()) + .bucket(self.bucket.to_string()) .key(blob_ref.0.to_string()) .send() .await; @@ -117,7 +195,7 @@ impl IStore for GarageStore { } }; - tracing::debug!("Fetched {}/{}", self.s3_bucket, blob_ref.0); + tracing::debug!("Fetched {}/{}", self.bucket, blob_ref.0); Ok(BlobVal::new(blob_ref.clone(), buffer)) } async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { @@ -125,7 +203,7 @@ impl IStore for GarageStore { let maybe_send = self.s3 .put_object() - .bucket(self.s3_bucket.to_string()) + .bucket(self.bucket.to_string()) .key(blob_val.blob_ref.0.to_string()) .body(streamable_value) .send() @@ -137,7 +215,7 @@ impl IStore for GarageStore { Err(StorageError::Internal) } Ok(_) => { - tracing::debug!("Inserted {}/{}", self.s3_bucket, blob_val.blob_ref.0); + tracing::debug!("Inserted {}/{}", self.bucket, blob_val.blob_ref.0); Ok(()) } } @@ -145,19 +223,19 @@ impl IStore for GarageStore { async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { let maybe_copy = self.s3 .copy_object() - .bucket(self.s3_bucket.to_string()) + .bucket(self.bucket.to_string()) .key(dst.0.clone()) - .copy_source(format!("/{}/{}", self.s3_bucket.to_string(), src.0.clone())) + .copy_source(format!("/{}/{}", self.bucket.to_string(), src.0.clone())) .send() .await; match maybe_copy { Err(e) => { - tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.s3_bucket, e); + tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.bucket, e); Err(StorageError::Internal) }, Ok(_) => { - tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.s3_bucket); + tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket); Ok(()) } } @@ -166,7 +244,7 @@ impl IStore for GarageStore { async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { let maybe_list = self.s3 .list_objects_v2() - .bucket(self.s3_bucket.to_string()) + .bucket(self.bucket.to_string()) .prefix(prefix) .into_paginator() .send() @@ -175,7 +253,7 @@ impl IStore for GarageStore { match maybe_list { Err(e) => { - tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.s3_bucket, e); + tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.bucket, e); Err(StorageError::Internal) } Ok(pagin_list_out) => Ok(pagin_list_out @@ -189,18 +267,18 @@ impl IStore for GarageStore { async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { let maybe_delete = self.s3 .delete_object() - .bucket(self.s3_bucket.to_string()) + .bucket(self.bucket.to_string()) .key(blob_ref.0.clone()) .send() .await; match maybe_delete { Err(e) => { - tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.s3_bucket, e); + tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.bucket, e); Err(StorageError::Internal) }, Ok(_) => { - tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.s3_bucket); + tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket); Ok(()) } } |