aboutsummaryrefslogtreecommitdiff
path: root/src/storage/garage.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/storage/garage.rs')
-rw-r--r--src/storage/garage.rs118
1 files changed, 98 insertions, 20 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index ec26e80..bc123b5 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -34,7 +34,7 @@ impl GarageBuilder {
#[async_trait]
impl IBuilder for GarageBuilder {
async fn build(&self) -> Result<Store, StorageError> {
- let creds = s3::config::Credentials::new(
+ let s3_creds = s3::config::Credentials::new(
self.conf.aws_access_key_id.clone(),
self.conf.aws_secret_access_key.clone(),
None,
@@ -42,17 +42,35 @@ impl IBuilder for GarageBuilder {
"aerogramme"
);
- let config = aws_config::from_env()
+ let s3_config = aws_config::from_env()
.region(aws_config::Region::new(self.conf.region.clone()))
- .credentials_provider(creds)
+ .credentials_provider(s3_creds)
.endpoint_url(self.conf.s3_endpoint.clone())
.load()
.await;
+ let s3_client = aws_sdk_s3::Client::new(&s3_config);
+
+ let k2v_config = k2v_client::K2vClientConfig {
+ endpoint: self.conf.k2v_endpoint.clone(),
+ region: self.conf.region.clone(),
+ aws_access_key_id: self.conf.aws_access_key_id.clone(),
+ aws_secret_access_key: self.conf.aws_secret_access_key.clone(),
+ bucket: self.conf.bucket.clone(),
+ user_agent: None,
+ };
+
+ let k2v_client = match k2v_client::K2vClient::new(k2v_config) {
+ Err(e) => {
+ tracing::error!("unable to build k2v client: {}", e);
+ return Err(StorageError::Internal);
+ }
+ Ok(v) => v,
+ };
- let s3_client = aws_sdk_s3::Client::new(&config);
Ok(Box::new(GarageStore {
- s3_bucket: self.conf.bucket.clone(),
- s3: s3_client
+ bucket: self.conf.bucket.clone(),
+ s3: s3_client,
+ k2v: k2v_client,
}))
}
fn unique(&self) -> UnicityBuffer {
@@ -61,13 +79,73 @@ impl IBuilder for GarageBuilder {
}
pub struct GarageStore {
- s3_bucket: String,
+ bucket: String,
s3: s3::Client,
+ k2v: k2v_client::K2vClient,
+}
+
+fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal {
+ let new_row_ref = row_ref.with_causality(causal_value.causality.into());
+ let row_values = causal_value.value.into_iter().map(|k2v_value| match k2v_value {
+ k2v_client::K2vValue::Tombstone => Alternative::Tombstone,
+ k2v_client::K2vValue::Value(v) => Alternative::Value(v),
+ }).collect::<Vec<_>>();
+
+ RowVal { row_ref: new_row_ref, value: row_values }
}
#[async_trait]
impl IStore for GarageStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
+ let batch_op = match select {
+ Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchReadOp {
+ partition_key: shard,
+ filter: k2v_client::Filter {
+ start: Some(sort_begin),
+ end: Some(sort_end),
+ ..k2v_client::Filter::default()
+ },
+ ..k2v_client::BatchReadOp::default()
+ }],
+ Selector::List(row_ref_list) => row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp {
+ partition_key: &row_ref.uid.shard,
+ filter: k2v_client::Filter {
+ start: Some(&row_ref.uid.sort),
+ ..k2v_client::Filter::default()
+ },
+ single_item: true,
+ ..k2v_client::BatchReadOp::default()
+ }).collect::<Vec<_>>(),
+ Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchReadOp {
+ partition_key: shard,
+ filter: k2v_client::Filter {
+ prefix: Some(sort_prefix),
+ ..k2v_client::Filter::default()
+ },
+ ..k2v_client::BatchReadOp::default()
+ }],
+ Selector::Single(row_ref) => {
+ let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await {
+ Err(e) => {
+ tracing::error!("K2V read item shard={}, sort={}, bucket={} failed: {}", row_ref.uid.shard, row_ref.uid.sort, self.bucket, e);
+ return Err(StorageError::Internal);
+ },
+ Ok(v) => v,
+ };
+
+ let row_val = causal_to_row_val((*row_ref).clone(), causal_value);
+ return Ok(vec![row_val])
+ },
+ };
+
+ let all_res = match self.k2v.read_batch(&batch_op).await {
+ Err(e) => {
+ tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e);
+ return Err(StorageError::Internal);
+ },
+ Ok(v) => v,
+ };
+
unimplemented!();
}
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
@@ -89,7 +167,7 @@ impl IStore for GarageStore {
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
let maybe_out = self.s3
.get_object()
- .bucket(self.s3_bucket.to_string())
+ .bucket(self.bucket.to_string())
.key(blob_ref.0.to_string())
.send()
.await;
@@ -117,7 +195,7 @@ impl IStore for GarageStore {
}
};
- tracing::debug!("Fetched {}/{}", self.s3_bucket, blob_ref.0);
+ tracing::debug!("Fetched {}/{}", self.bucket, blob_ref.0);
Ok(BlobVal::new(blob_ref.clone(), buffer))
}
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
@@ -125,7 +203,7 @@ impl IStore for GarageStore {
let maybe_send = self.s3
.put_object()
- .bucket(self.s3_bucket.to_string())
+ .bucket(self.bucket.to_string())
.key(blob_val.blob_ref.0.to_string())
.body(streamable_value)
.send()
@@ -137,7 +215,7 @@ impl IStore for GarageStore {
Err(StorageError::Internal)
}
Ok(_) => {
- tracing::debug!("Inserted {}/{}", self.s3_bucket, blob_val.blob_ref.0);
+ tracing::debug!("Inserted {}/{}", self.bucket, blob_val.blob_ref.0);
Ok(())
}
}
@@ -145,19 +223,19 @@ impl IStore for GarageStore {
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
let maybe_copy = self.s3
.copy_object()
- .bucket(self.s3_bucket.to_string())
+ .bucket(self.bucket.to_string())
.key(dst.0.clone())
- .copy_source(format!("/{}/{}", self.s3_bucket.to_string(), src.0.clone()))
+ .copy_source(format!("/{}/{}", self.bucket.to_string(), src.0.clone()))
.send()
.await;
match maybe_copy {
Err(e) => {
- tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.s3_bucket, e);
+ tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.bucket, e);
Err(StorageError::Internal)
},
Ok(_) => {
- tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.s3_bucket);
+ tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket);
Ok(())
}
}
@@ -166,7 +244,7 @@ impl IStore for GarageStore {
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
let maybe_list = self.s3
.list_objects_v2()
- .bucket(self.s3_bucket.to_string())
+ .bucket(self.bucket.to_string())
.prefix(prefix)
.into_paginator()
.send()
@@ -175,7 +253,7 @@ impl IStore for GarageStore {
match maybe_list {
Err(e) => {
- tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.s3_bucket, e);
+ tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.bucket, e);
Err(StorageError::Internal)
}
Ok(pagin_list_out) => Ok(pagin_list_out
@@ -189,18 +267,18 @@ impl IStore for GarageStore {
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
let maybe_delete = self.s3
.delete_object()
- .bucket(self.s3_bucket.to_string())
+ .bucket(self.bucket.to_string())
.key(blob_ref.0.clone())
.send()
.await;
match maybe_delete {
Err(e) => {
- tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.s3_bucket, e);
+ tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.bucket, e);
Err(StorageError::Internal)
},
Ok(_) => {
- tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.s3_bucket);
+ tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket);
Ok(())
}
}