diff options
Diffstat (limited to 'src/storage/garage.rs')
-rw-r--r-- | src/storage/garage.rs | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 90b84d6..709e729 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -105,6 +105,7 @@ fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> #[async_trait] impl IStore for GarageStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { + tracing::trace!(select=%select, command="row_fetch"); let (pk_list, batch_op) = match select { Selector::Range { shard, @@ -196,21 +197,26 @@ impl IStore for GarageStore { } Ok(v) => v, }; + //println!("fetch res -> {:?}", all_raw_res); - let row_vals = all_raw_res - .into_iter() - .fold(vec![], |mut acc, v| { - acc.extend(v.items); - acc - }) - .into_iter() - .zip(pk_list.into_iter()) - .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) - .collect::<Vec<_>>(); + let row_vals = + all_raw_res + .into_iter() + .zip(pk_list.into_iter()) + .fold(vec![], |mut acc, (page, pk)| { + page.items + .into_iter() + .map(|(sk, cv)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) + .for_each(|rr| acc.push(rr)); + + acc + }); + tracing::debug!(fetch_count = row_vals.len(), command = "row_fetch"); Ok(row_vals) } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { + tracing::trace!(select=%select, command="row_rm"); let del_op = match select { Selector::Range { shard, @@ -280,6 +286,7 @@ impl IStore for GarageStore { } async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { + tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert"); let batch_ops = values .iter() .map(|v| k2v_client::BatchInsertOp { @@ -307,6 +314,7 @@ impl IStore for GarageStore { } } async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { + tracing::trace!(entry=%value, command="row_poll"); loop { if let Some(ct) = &value.causality { match self @@ -343,6 +351,7 @@ impl IStore for GarageStore { } async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_fetch"); let maybe_out = self .s3 .get_object() @@ -382,6 +391,7 @@ impl IStore for GarageStore { Ok(bv) } async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); let streamable_value = s3::primitives::ByteStream::from(blob_val.value); let maybe_send = self @@ -406,6 +416,7 @@ impl IStore for GarageStore { } } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(src=%src, dst=%dst, command="blob_copy"); let maybe_copy = self .s3 .copy_object() @@ -433,6 +444,7 @@ impl IStore for GarageStore { } } async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { + tracing::trace!(prefix = prefix, command = "blob_list"); let maybe_list = self .s3 .list_objects_v2() @@ -462,6 +474,7 @@ impl IStore for GarageStore { } } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_rm"); let maybe_delete = self .s3 .delete_object() |