From 18bba784eee2331e7f27fe31b89e7c674e24ded0 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 26 Dec 2023 18:33:56 +0100 Subject: insert logic --- src/storage/garage.rs | 82 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 24 deletions(-) (limited to 'src/storage') diff --git a/src/storage/garage.rs b/src/storage/garage.rs index bc123b5..665a0c6 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -97,33 +97,41 @@ fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> #[async_trait] impl IStore for GarageStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError> { - let batch_op = match select { - Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchReadOp { - partition_key: shard, - filter: k2v_client::Filter { - start: Some(sort_begin), - end: Some(sort_end), - ..k2v_client::Filter::default() - }, - ..k2v_client::BatchReadOp::default() - }], - Selector::List(row_ref_list) => row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp { - partition_key: &row_ref.uid.shard, - filter: k2v_client::Filter { - start: Some(&row_ref.uid.sort), - ..k2v_client::Filter::default() - }, - single_item: true, - ..k2v_client::BatchReadOp::default() - }).collect::>(), - Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchReadOp { + let (pk_list, batch_op) = match select { + Selector::Range { shard, sort_begin, sort_end } => ( + vec![shard.to_string()], + vec![k2v_client::BatchReadOp { + partition_key: shard, + filter: k2v_client::Filter { + start: Some(sort_begin), + end: Some(sort_end), + ..k2v_client::Filter::default() + }, + ..k2v_client::BatchReadOp::default() + }] + ), + Selector::List(row_ref_list) => ( + row_ref_list.iter().map(|row_ref| row_ref.uid.shard.to_string()).collect::>(), + row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp { + partition_key: &row_ref.uid.shard, + filter: k2v_client::Filter { + start: Some(&row_ref.uid.sort), + ..k2v_client::Filter::default() + }, + single_item: true, + ..k2v_client::BatchReadOp::default() + }).collect::>() + ), + Selector::Prefix { shard, sort_prefix } => ( + vec![shard.to_string()], + vec![k2v_client::BatchReadOp { partition_key: shard, filter: k2v_client::Filter { prefix: Some(sort_prefix), ..k2v_client::Filter::default() }, ..k2v_client::BatchReadOp::default() - }], + }]), Selector::Single(row_ref) => { let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await { Err(e) => { @@ -138,7 +146,7 @@ impl IStore for GarageStore { }, }; - let all_res = match self.k2v.read_batch(&batch_op).await { + let all_raw_res = match self.k2v.read_batch(&batch_op).await { Err(e) => { tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e); return Err(StorageError::Internal); @@ -146,15 +154,41 @@ impl IStore for GarageStore { Ok(v) => v, }; - unimplemented!(); + let row_vals = all_raw_res + .into_iter() + .fold(vec![], |mut acc, v| { + acc.extend(v.items); + acc + }) + .into_iter() + .zip(pk_list.into_iter()) + .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) + .collect::>(); + + Ok(row_vals) } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { unimplemented!(); } async fn row_insert(&self, values: Vec) -> Result<(), StorageError> { - unimplemented!(); + let batch_ops = values.iter().map(|v| k2v_client::BatchInsertOp { + partition_key: &v.row_ref.uid.shard, + sort_key: &v.row_ref.uid.sort, + causality: v.row_ref.causality.clone().map(|ct| ct.into()), + value: v.value.iter().next().map(|cv| match cv { + Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), + Alternative::Tombstone => k2v_client::K2vValue::Tombstone, + }).unwrap_or(k2v_client::K2vValue::Tombstone) + }).collect::>(); + match self.k2v.insert_batch(&batch_ops).await { + Err(e) => { + tracing::error!("k2v can't insert some value: {}", e); + Err(StorageError::Internal) + }, + Ok(v) => Ok(v), + } } async fn row_poll(&self, value: &RowRef) -> Result { unimplemented!(); -- cgit v1.2.3