aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/storage/garage.rs82
1 files changed, 58 insertions, 24 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index bc123b5..665a0c6 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -97,33 +97,41 @@ fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) ->
#[async_trait]
impl IStore for GarageStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
- let batch_op = match select {
- Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchReadOp {
- partition_key: shard,
- filter: k2v_client::Filter {
- start: Some(sort_begin),
- end: Some(sort_end),
- ..k2v_client::Filter::default()
- },
- ..k2v_client::BatchReadOp::default()
- }],
- Selector::List(row_ref_list) => row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp {
- partition_key: &row_ref.uid.shard,
- filter: k2v_client::Filter {
- start: Some(&row_ref.uid.sort),
- ..k2v_client::Filter::default()
- },
- single_item: true,
- ..k2v_client::BatchReadOp::default()
- }).collect::<Vec<_>>(),
- Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchReadOp {
+ let (pk_list, batch_op) = match select {
+ Selector::Range { shard, sort_begin, sort_end } => (
+ vec![shard.to_string()],
+ vec![k2v_client::BatchReadOp {
+ partition_key: shard,
+ filter: k2v_client::Filter {
+ start: Some(sort_begin),
+ end: Some(sort_end),
+ ..k2v_client::Filter::default()
+ },
+ ..k2v_client::BatchReadOp::default()
+ }]
+ ),
+ Selector::List(row_ref_list) => (
+ row_ref_list.iter().map(|row_ref| row_ref.uid.shard.to_string()).collect::<Vec<_>>(),
+ row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp {
+ partition_key: &row_ref.uid.shard,
+ filter: k2v_client::Filter {
+ start: Some(&row_ref.uid.sort),
+ ..k2v_client::Filter::default()
+ },
+ single_item: true,
+ ..k2v_client::BatchReadOp::default()
+ }).collect::<Vec<_>>()
+ ),
+ Selector::Prefix { shard, sort_prefix } => (
+ vec![shard.to_string()],
+ vec![k2v_client::BatchReadOp {
partition_key: shard,
filter: k2v_client::Filter {
prefix: Some(sort_prefix),
..k2v_client::Filter::default()
},
..k2v_client::BatchReadOp::default()
- }],
+ }]),
Selector::Single(row_ref) => {
let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await {
Err(e) => {
@@ -138,7 +146,7 @@ impl IStore for GarageStore {
},
};
- let all_res = match self.k2v.read_batch(&batch_op).await {
+ let all_raw_res = match self.k2v.read_batch(&batch_op).await {
Err(e) => {
tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e);
return Err(StorageError::Internal);
@@ -146,15 +154,41 @@ impl IStore for GarageStore {
Ok(v) => v,
};
- unimplemented!();
+ let row_vals = all_raw_res
+ .into_iter()
+ .fold(vec![], |mut acc, v| {
+ acc.extend(v.items);
+ acc
+ })
+ .into_iter()
+ .zip(pk_list.into_iter())
+ .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
+ .collect::<Vec<_>>();
+
+ Ok(row_vals)
}
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
unimplemented!();
}
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
- unimplemented!();
+ let batch_ops = values.iter().map(|v| k2v_client::BatchInsertOp {
+ partition_key: &v.row_ref.uid.shard,
+ sort_key: &v.row_ref.uid.sort,
+ causality: v.row_ref.causality.clone().map(|ct| ct.into()),
+ value: v.value.iter().next().map(|cv| match cv {
+ Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()),
+ Alternative::Tombstone => k2v_client::K2vValue::Tombstone,
+ }).unwrap_or(k2v_client::K2vValue::Tombstone)
+ }).collect::<Vec<_>>();
+ match self.k2v.insert_batch(&batch_ops).await {
+ Err(e) => {
+ tracing::error!("k2v can't insert some value: {}", e);
+ Err(StorageError::Internal)
+ },
+ Ok(v) => Ok(v),
+ }
}
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
unimplemented!();