From 7ac24ad913fa081e1bd6f5b042b9da0173dad267 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Dec 2023 14:58:28 +0100 Subject: cargo format --- src/storage/garage.rs | 252 ++++++++++++++++++++++++++++++----------------- src/storage/in_memory.rs | 71 ++++++++----- src/storage/mod.rs | 41 +++++--- 3 files changed, 232 insertions(+), 132 deletions(-) (limited to 'src/storage') diff --git a/src/storage/garage.rs b/src/storage/garage.rs index f9ba756..d08585f 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -1,10 +1,6 @@ use crate::storage::*; +use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError}; use serde::Serialize; -use aws_sdk_s3::{ - self as s3, - error::SdkError, - operation::get_object::GetObjectError, -}; #[derive(Clone, Debug, Serialize)] pub struct GarageConf { @@ -28,18 +24,18 @@ impl GarageBuilder { unicity.extend_from_slice(file!().as_bytes()); unicity.append(&mut rmp_serde::to_vec(&conf)?); Ok(Arc::new(Self { conf, unicity })) - } + } } #[async_trait] impl IBuilder for GarageBuilder { async fn build(&self) -> Result { let s3_creds = s3::config::Credentials::new( - self.conf.aws_access_key_id.clone(), - self.conf.aws_secret_access_key.clone(), - None, - None, - "aerogramme" + self.conf.aws_access_key_id.clone(), + self.conf.aws_secret_access_key.clone(), + None, + None, + "aerogramme", ); let s3_config = aws_config::from_env() @@ -51,12 +47,12 @@ impl IBuilder for GarageBuilder { let s3_client = aws_sdk_s3::Client::new(&s3_config); let k2v_config = k2v_client::K2vClientConfig { - endpoint: self.conf.k2v_endpoint.clone(), - region: self.conf.region.clone(), - aws_access_key_id: self.conf.aws_access_key_id.clone(), - aws_secret_access_key: self.conf.aws_secret_access_key.clone(), - bucket: self.conf.bucket.clone(), - user_agent: None, + endpoint: self.conf.k2v_endpoint.clone(), + region: self.conf.region.clone(), + aws_access_key_id: self.conf.aws_access_key_id.clone(), + aws_secret_access_key: self.conf.aws_secret_access_key.clone(), + bucket: self.conf.bucket.clone(), + user_agent: None, }; let k2v_client = match k2v_client::K2vClient::new(k2v_config) { @@ -67,7 +63,7 @@ impl IBuilder for GarageBuilder { Ok(v) => v, }; - Ok(Box::new(GarageStore { + Ok(Box::new(GarageStore { bucket: self.conf.bucket.clone(), s3: s3_client, k2v: k2v_client, @@ -86,19 +82,30 @@ pub struct GarageStore { fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal { let new_row_ref = row_ref.with_causality(causal_value.causality.into()); - let row_values = causal_value.value.into_iter().map(|k2v_value| match k2v_value { - k2v_client::K2vValue::Tombstone => Alternative::Tombstone, - k2v_client::K2vValue::Value(v) => Alternative::Value(v), - }).collect::>(); + let row_values = causal_value + .value + .into_iter() + .map(|k2v_value| match k2v_value { + k2v_client::K2vValue::Tombstone => Alternative::Tombstone, + k2v_client::K2vValue::Value(v) => Alternative::Value(v), + }) + .collect::>(); - RowVal { row_ref: new_row_ref, value: row_values } + RowVal { + row_ref: new_row_ref, + value: row_values, + } } #[async_trait] impl IStore for GarageStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError> { let (pk_list, batch_op) = match select { - Selector::Range { shard, sort_begin, sort_end } => ( + Selector::Range { + shard, + sort_begin, + sort_end, + } => ( vec![shard.to_string()], vec![k2v_client::BatchReadOp { partition_key: shard, @@ -108,49 +115,71 @@ impl IStore for GarageStore { ..k2v_client::Filter::default() }, ..k2v_client::BatchReadOp::default() - }] + }], ), Selector::List(row_ref_list) => ( - row_ref_list.iter().map(|row_ref| row_ref.uid.shard.to_string()).collect::>(), - row_ref_list.iter().map(|row_ref| k2v_client::BatchReadOp { - partition_key: &row_ref.uid.shard, + row_ref_list + .iter() + .map(|row_ref| row_ref.uid.shard.to_string()) + .collect::>(), + row_ref_list + .iter() + .map(|row_ref| k2v_client::BatchReadOp { + partition_key: &row_ref.uid.shard, + filter: k2v_client::Filter { + start: Some(&row_ref.uid.sort), + ..k2v_client::Filter::default() + }, + single_item: true, + ..k2v_client::BatchReadOp::default() + }) + .collect::>(), + ), + Selector::Prefix { shard, sort_prefix } => ( + vec![shard.to_string()], + vec![k2v_client::BatchReadOp { + partition_key: shard, filter: k2v_client::Filter { - start: Some(&row_ref.uid.sort), + prefix: Some(sort_prefix), ..k2v_client::Filter::default() }, - single_item: true, ..k2v_client::BatchReadOp::default() - }).collect::>() + }], ), - Selector::Prefix { shard, sort_prefix } => ( - vec![shard.to_string()], - vec![k2v_client::BatchReadOp { - partition_key: shard, - filter: k2v_client::Filter { - prefix: Some(sort_prefix), - ..k2v_client::Filter::default() - }, - ..k2v_client::BatchReadOp::default() - }]), Selector::Single(row_ref) => { - let causal_value = match self.k2v.read_item(&row_ref.uid.shard, &row_ref.uid.sort).await { + let causal_value = match self + .k2v + .read_item(&row_ref.uid.shard, &row_ref.uid.sort) + .await + { Err(e) => { - tracing::error!("K2V read item shard={}, sort={}, bucket={} failed: {}", row_ref.uid.shard, row_ref.uid.sort, self.bucket, e); + tracing::error!( + "K2V read item shard={}, sort={}, bucket={} failed: {}", + row_ref.uid.shard, + row_ref.uid.sort, + self.bucket, + e + ); return Err(StorageError::Internal); - }, + } Ok(v) => v, }; let row_val = causal_to_row_val((*row_ref).clone(), causal_value); - return Ok(vec![row_val]) - }, + return Ok(vec![row_val]); + } }; let all_raw_res = match self.k2v.read_batch(&batch_op).await { Err(e) => { - tracing::error!("k2v read batch failed for {:?}, bucket {} with err: {}", select, self.bucket, e); + tracing::error!( + "k2v read batch failed for {:?}, bucket {} with err: {}", + select, + self.bucket, + e + ); return Err(StorageError::Internal); - }, + } Ok(v) => v, }; @@ -163,13 +192,17 @@ impl IStore for GarageStore { .into_iter() .zip(pk_list.into_iter()) .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) - .collect::>(); + .collect::>(); Ok(row_vals) } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { let del_op = match select { - Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp { + Selector::Range { + shard, + sort_begin, + sort_end, + } => vec![k2v_client::BatchDeleteOp { partition_key: shard, prefix: None, start: Some(sort_begin), @@ -178,21 +211,24 @@ impl IStore for GarageStore { }], Selector::List(row_ref_list) => { // Insert null values with causality token = delete - let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp { - partition_key: &v.uid.shard, - sort_key: &v.uid.sort, - causality: v.causality.clone().map(|ct| ct.into()), - value: k2v_client::K2vValue::Tombstone, - }).collect::>(); - + let batch_op = row_ref_list + .iter() + .map(|v| k2v_client::BatchInsertOp { + partition_key: &v.uid.shard, + sort_key: &v.uid.sort, + causality: v.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }) + .collect::>(); + return match self.k2v.insert_batch(&batch_op).await { Err(e) => { tracing::error!("Unable to delete the list of values: {}", e); Err(StorageError::Internal) - }, + } Ok(_) => Ok(()), }; - }, + } Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp { partition_key: shard, prefix: Some(sort_prefix), @@ -208,15 +244,15 @@ impl IStore for GarageStore { causality: row_ref.causality.clone().map(|ct| ct.into()), value: k2v_client::K2vValue::Tombstone, }]; - + return match self.k2v.insert_batch(&batch_op).await { Err(e) => { tracing::error!("Unable to delete the list of values: {}", e); Err(StorageError::Internal) - }, + } Ok(_) => Ok(()), }; - }, + } }; // Finally here we only have prefix & range @@ -224,34 +260,46 @@ impl IStore for GarageStore { Err(e) => { tracing::error!("delete batch error: {}", e); Err(StorageError::Internal) - }, + } Ok(_) => Ok(()), } } async fn row_insert(&self, values: Vec) -> Result<(), StorageError> { - let batch_ops = values.iter().map(|v| k2v_client::BatchInsertOp { - partition_key: &v.row_ref.uid.shard, - sort_key: &v.row_ref.uid.sort, - causality: v.row_ref.causality.clone().map(|ct| ct.into()), - value: v.value.iter().next().map(|cv| match cv { - Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), - Alternative::Tombstone => k2v_client::K2vValue::Tombstone, - }).unwrap_or(k2v_client::K2vValue::Tombstone) - }).collect::>(); + let batch_ops = values + .iter() + .map(|v| k2v_client::BatchInsertOp { + partition_key: &v.row_ref.uid.shard, + sort_key: &v.row_ref.uid.sort, + causality: v.row_ref.causality.clone().map(|ct| ct.into()), + value: v + .value + .iter() + .next() + .map(|cv| match cv { + Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), + Alternative::Tombstone => k2v_client::K2vValue::Tombstone, + }) + .unwrap_or(k2v_client::K2vValue::Tombstone), + }) + .collect::>(); match self.k2v.insert_batch(&batch_ops).await { Err(e) => { tracing::error!("k2v can't insert some value: {}", e); Err(StorageError::Internal) - }, + } Ok(v) => Ok(v), } } async fn row_poll(&self, value: &RowRef) -> Result { loop { if let Some(ct) = &value.causality { - match self.k2v.poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None).await { + match self + .k2v + .poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None) + .await + { Err(e) => { tracing::error!("Unable to poll item: {}", e); return Err(StorageError::Internal); @@ -262,8 +310,7 @@ impl IStore for GarageStore { } else { match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await { Err(k2v_client::Error::NotFound) => { - self - .k2v + self.k2v .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None) .await .map_err(|e| { @@ -273,8 +320,8 @@ impl IStore for GarageStore { } Err(e) => { tracing::error!("Unable to read item in polling logic: {}", e); - return Err(StorageError::Internal) - }, + return Err(StorageError::Internal); + } Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)), } } @@ -282,7 +329,8 @@ impl IStore for GarageStore { } async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result { - let maybe_out = self.s3 + let maybe_out = self + .s3 .get_object() .bucket(self.bucket.to_string()) .key(blob_ref.0.to_string()) @@ -296,12 +344,12 @@ impl IStore for GarageStore { e => { tracing::warn!("Blob Fetch Error, Service Error: {}", e); return Err(StorageError::Internal); - }, + } }, Err(e) => { tracing::warn!("Blob Fetch Error, {}", e); return Err(StorageError::Internal); - }, + } }; let buffer = match object_output.body.collect().await { @@ -316,9 +364,10 @@ impl IStore for GarageStore { Ok(BlobVal::new(blob_ref.clone(), buffer)) } async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { - let streamable_value = s3::primitives::ByteStream::from(blob_val.value); + let streamable_value = s3::primitives::ByteStream::from(blob_val.value); - let maybe_send = self.s3 + let maybe_send = self + .s3 .put_object() .bucket(self.bucket.to_string()) .key(blob_val.blob_ref.0.to_string()) @@ -338,7 +387,8 @@ impl IStore for GarageStore { } } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { - let maybe_copy = self.s3 + let maybe_copy = self + .s3 .copy_object() .bucket(self.bucket.to_string()) .key(dst.0.clone()) @@ -348,18 +398,24 @@ impl IStore for GarageStore { match maybe_copy { Err(e) => { - tracing::error!("unable to copy object {} to {} (bucket: {}), error: {}", src.0, dst.0, self.bucket, e); + tracing::error!( + "unable to copy object {} to {} (bucket: {}), error: {}", + src.0, + dst.0, + self.bucket, + e + ); Err(StorageError::Internal) - }, + } Ok(_) => { tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket); Ok(()) } } - } async fn blob_list(&self, prefix: &str) -> Result, StorageError> { - let maybe_list = self.s3 + let maybe_list = self + .s3 .list_objects_v2() .bucket(self.bucket.to_string()) .prefix(prefix) @@ -370,7 +426,12 @@ impl IStore for GarageStore { match maybe_list { Err(e) => { - tracing::error!("listing prefix {} on bucket {} failed: {}", prefix, self.bucket, e); + tracing::error!( + "listing prefix {} on bucket {} failed: {}", + prefix, + self.bucket, + e + ); Err(StorageError::Internal) } Ok(pagin_list_out) => Ok(pagin_list_out @@ -382,7 +443,8 @@ impl IStore for GarageStore { } } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { - let maybe_delete = self.s3 + let maybe_delete = self + .s3 .delete_object() .bucket(self.bucket.to_string()) .key(blob_ref.0.clone()) @@ -391,9 +453,14 @@ impl IStore for GarageStore { match maybe_delete { Err(e) => { - tracing::error!("unable to delete {} (bucket: {}), error {}", blob_ref.0, self.bucket, e); + tracing::error!( + "unable to delete {} (bucket: {}), error {}", + blob_ref.0, + self.bucket, + e + ); Err(StorageError::Internal) - }, + } Ok(_) => { tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket); Ok(()) @@ -401,4 +468,3 @@ impl IStore for GarageStore { } } } - diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index ee7c9a6..3c3a94c 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -1,6 +1,6 @@ use crate::storage::*; -use std::collections::{HashMap, BTreeMap}; -use std::ops::Bound::{Included, Unbounded, Excluded, self}; +use std::collections::{BTreeMap, HashMap}; +use std::ops::Bound::{self, Excluded, Included, Unbounded}; use std::sync::{Arc, RwLock}; use tokio::sync::Notify; @@ -16,7 +16,7 @@ impl MemDb { Self(tokio::sync::Mutex::new(HashMap::new())) } - pub async fn builder(&self, username: &str) -> Arc { + pub async fn builder(&self, username: &str) -> Arc { let mut global_storage = self.0.lock().await; global_storage .entry(username.to_string()) @@ -60,8 +60,8 @@ impl InternalRowVal { } fn to_row_val(&self, row_ref: RowRef) -> RowVal { - RowVal{ - row_ref: row_ref.with_causality(self.version.to_string()), + RowVal { + row_ref: row_ref.with_causality(self.version.to_string()), value: self.concurrent_values(), } } @@ -75,7 +75,7 @@ struct InternalBlobVal { impl InternalBlobVal { fn to_blob_val(&self, bref: &BlobRef) -> BlobVal { BlobVal { - blob_ref: bref.clone(), + blob_ref: bref.clone(), meta: self.metadata.clone(), value: self.data.clone(), } @@ -113,7 +113,7 @@ impl IBuilder for MemBuilder { row: self.row.clone(), blob: self.blob.clone(), })) - } + } fn unique(&self) -> UnicityBuffer { UnicityBuffer(self.unicity.clone()) @@ -170,24 +170,32 @@ impl IStore for MemStore { let store = self.row.read().or(Err(StorageError::Internal))?; match select { - Selector::Range { shard, sort_begin, sort_end } => { - Ok(store - .get(*shard) - .unwrap_or(&BTreeMap::new()) - .range((Included(sort_begin.to_string()), Excluded(sort_end.to_string()))) - .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) - .collect::>()) - }, + Selector::Range { + shard, + sort_begin, + sort_end, + } => Ok(store + .get(*shard) + .unwrap_or(&BTreeMap::new()) + .range(( + Included(sort_begin.to_string()), + Excluded(sort_end.to_string()), + )) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) + .collect::>()), Selector::List(rlist) => { let mut acc = vec![]; for row_ref in rlist { - let maybe_intval = store.get(&row_ref.uid.shard).map(|v| v.get(&row_ref.uid.sort)).flatten(); + let maybe_intval = store + .get(&row_ref.uid.shard) + .map(|v| v.get(&row_ref.uid.sort)) + .flatten(); if let Some(intval) = maybe_intval { acc.push(intval.to_row_val(row_ref.clone())); } } Ok(acc) - }, + } Selector::Prefix { shard, sort_prefix } => { let last_bound = prefix_last_bound(sort_prefix); @@ -197,13 +205,13 @@ impl IStore for MemStore { .range((Included(sort_prefix.to_string()), last_bound)) .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) .collect::>()) - }, + } Selector::Single(row_ref) => { let intval = store - .get(&row_ref.uid.shard) - .ok_or(StorageError::NotFound)? - .get(&row_ref.uid.sort) - .ok_or(StorageError::NotFound)?; + .get(&row_ref.uid.shard) + .ok_or(StorageError::NotFound)? + .get(&row_ref.uid.sort) + .ok_or(StorageError::NotFound)?; Ok(vec![intval.to_row_val((*row_ref).clone())]) } } @@ -213,7 +221,12 @@ impl IStore for MemStore { tracing::trace!(select=%select, command="row_rm"); let values = match select { - Selector::Range { .. } | Selector::Prefix { .. } => self.row_fetch(select).await?.into_iter().map(|rv| rv.row_ref).collect::>(), + Selector::Range { .. } | Selector::Prefix { .. } => self + .row_fetch(select) + .await? + .into_iter() + .map(|rv| rv.row_ref) + .collect::>(), Selector::List(rlist) => rlist.clone(), Selector::Single(row_ref) => vec![(*row_ref).clone()], }; @@ -282,7 +295,10 @@ impl IStore for MemStore { async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result { tracing::trace!(entry=%blob_ref, command="blob_fetch"); let store = self.blob.read().or(Err(StorageError::Internal))?; - store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref)) + store + .get(&blob_ref.0) + .ok_or(StorageError::NotFound) + .map(|v| v.to_blob_val(blob_ref)) } async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); @@ -300,10 +316,13 @@ impl IStore for MemStore { Ok(()) } async fn blob_list(&self, prefix: &str) -> Result, StorageError> { - tracing::trace!(prefix=prefix, command="blob_list"); + tracing::trace!(prefix = prefix, command = "blob_list"); let store = self.blob.read().or(Err(StorageError::Internal))?; let last_bound = prefix_last_bound(prefix); - let blist = store.range((Included(prefix.to_string()), last_bound)).map(|(k, _)| BlobRef(k.to_string())).collect::>(); + let blist = store + .range((Included(prefix.to_string()), last_bound)) + .map(|(k, _)| BlobRef(k.to_string())) + .collect::>(); Ok(blist) } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c81ffe4..1f86f71 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -8,13 +8,13 @@ * into the object system so it is not exposed. */ -pub mod in_memory; pub mod garage; +pub mod in_memory; -use std::sync::Arc; -use std::hash::Hash; -use std::collections::HashMap; use async_trait::async_trait; +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; #[derive(Debug, Clone)] pub enum Alternative { @@ -52,14 +52,18 @@ pub struct RowRef { } impl std::fmt::Display for RowRef { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "RowRef({}, {}, {:?})", self.uid.shard, self.uid.sort, self.causality) + write!( + f, + "RowRef({}, {}, {:?})", + self.uid.shard, self.uid.sort, self.causality + ) } } impl RowRef { pub fn new(shard: &str, sort: &str) -> Self { Self { - uid: RowUid { + uid: RowUid { shard: shard.to_string(), sort: sort.to_string(), }, @@ -87,7 +91,6 @@ impl RowVal { } } - #[derive(Debug, Clone)] pub struct BlobRef(pub String); impl std::fmt::Display for BlobRef { @@ -105,7 +108,8 @@ pub struct BlobVal { impl BlobVal { pub fn new(blob_ref: BlobRef, value: Vec) -> Self { Self { - blob_ref, value, + blob_ref, + value, meta: HashMap::new(), } } @@ -118,16 +122,27 @@ impl BlobVal { #[derive(Debug)] pub enum Selector<'a> { - Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str }, - List (Vec), // list of (shard_key, sort_key) + Range { + shard: &'a str, + sort_begin: &'a str, + sort_end: &'a str, + }, + List(Vec), // list of (shard_key, sort_key) #[allow(dead_code)] - Prefix { shard: &'a str, sort_prefix: &'a str }, + Prefix { + shard: &'a str, + sort_prefix: &'a str, + }, Single(&'a RowRef), } impl<'a> std::fmt::Display for Selector<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Range { shard, sort_begin, sort_end } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end), + Self::Range { + shard, + sort_begin, + sort_end, + } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end), Self::List(list) => write!(f, "List({:?})", list), Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix), Self::Single(row_ref) => write!(f, "Single({})", row_ref), @@ -149,7 +164,7 @@ pub trait IStore { async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; } -#[derive(Clone,Debug,PartialEq,Eq,Hash)] +#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct UnicityBuffer(Vec); #[async_trait] -- cgit v1.2.3