aboutsummaryrefslogtreecommitdiff
path: root/src/storage
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-01-31 11:01:18 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-01-31 11:01:18 +0100
commit22f0eb901ae1f4a38a93bcfc268ebe0f74a6482e (patch)
tree885d59c3bfb961b058b2ad7a7ec8915750c54a3b /src/storage
parentc27919a757ac15fe49393ef91d90a525a3e501ee (diff)
downloadaerogramme-22f0eb901ae1f4a38a93bcfc268ebe0f74a6482e.tar.gz
aerogramme-22f0eb901ae1f4a38a93bcfc268ebe0f74a6482e.zip
format + fix storage bug
Diffstat (limited to 'src/storage')
-rw-r--r--src/storage/garage.rs33
1 files changed, 23 insertions, 10 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index 90b84d6..709e729 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -105,6 +105,7 @@ fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) ->
#[async_trait]
impl IStore for GarageStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
+ tracing::trace!(select=%select, command="row_fetch");
let (pk_list, batch_op) = match select {
Selector::Range {
shard,
@@ -196,21 +197,26 @@ impl IStore for GarageStore {
}
Ok(v) => v,
};
+ //println!("fetch res -> {:?}", all_raw_res);
- let row_vals = all_raw_res
- .into_iter()
- .fold(vec![], |mut acc, v| {
- acc.extend(v.items);
- acc
- })
- .into_iter()
- .zip(pk_list.into_iter())
- .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
- .collect::<Vec<_>>();
+ let row_vals =
+ all_raw_res
+ .into_iter()
+ .zip(pk_list.into_iter())
+ .fold(vec![], |mut acc, (page, pk)| {
+ page.items
+ .into_iter()
+ .map(|(sk, cv)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
+ .for_each(|rr| acc.push(rr));
+
+ acc
+ });
+ tracing::debug!(fetch_count = row_vals.len(), command = "row_fetch");
Ok(row_vals)
}
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
+ tracing::trace!(select=%select, command="row_rm");
let del_op = match select {
Selector::Range {
shard,
@@ -280,6 +286,7 @@ impl IStore for GarageStore {
}
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
+ tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert");
let batch_ops = values
.iter()
.map(|v| k2v_client::BatchInsertOp {
@@ -307,6 +314,7 @@ impl IStore for GarageStore {
}
}
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
+ tracing::trace!(entry=%value, command="row_poll");
loop {
if let Some(ct) = &value.causality {
match self
@@ -343,6 +351,7 @@ impl IStore for GarageStore {
}
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
+ tracing::trace!(entry=%blob_ref, command="blob_fetch");
let maybe_out = self
.s3
.get_object()
@@ -382,6 +391,7 @@ impl IStore for GarageStore {
Ok(bv)
}
async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
+ tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
let streamable_value = s3::primitives::ByteStream::from(blob_val.value);
let maybe_send = self
@@ -406,6 +416,7 @@ impl IStore for GarageStore {
}
}
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
+ tracing::trace!(src=%src, dst=%dst, command="blob_copy");
let maybe_copy = self
.s3
.copy_object()
@@ -433,6 +444,7 @@ impl IStore for GarageStore {
}
}
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
+ tracing::trace!(prefix = prefix, command = "blob_list");
let maybe_list = self
.s3
.list_objects_v2()
@@ -462,6 +474,7 @@ impl IStore for GarageStore {
}
}
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
+ tracing::trace!(entry=%blob_ref, command="blob_rm");
let maybe_delete = self
.s3
.delete_object()