diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-03-08 08:17:03 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2024-03-08 08:17:03 +0100 |
commit | 1a43ce5ac7033c148f64a033f2b1d335e95e11d5 (patch) | |
tree | 60b234604170fe207248458a9c4cdd3f4b7c36f2 /src/storage | |
parent | bb9cb386b65834c44cae86bd100f800883022062 (diff) | |
download | aerogramme-1a43ce5ac7033c148f64a033f2b1d335e95e11d5.tar.gz aerogramme-1a43ce5ac7033c148f64a033f2b1d335e95e11d5.zip |
WIP refactor
Diffstat (limited to 'src/storage')
-rw-r--r-- | src/storage/garage.rs | 538 | ||||
-rw-r--r-- | src/storage/in_memory.rs | 334 | ||||
-rw-r--r-- | src/storage/mod.rs | 179 |
3 files changed, 0 insertions, 1051 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs deleted file mode 100644 index 7152764..0000000 --- a/src/storage/garage.rs +++ /dev/null @@ -1,538 +0,0 @@ -use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError}; -use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; -use aws_smithy_runtime_api::client::http::SharedHttpClient; -use hyper_rustls::HttpsConnector; -use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient}; -use hyper_util::rt::TokioExecutor; -use serde::Serialize; - -use crate::storage::*; - -pub struct GarageRoot { - k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>, - aws_http: SharedHttpClient, -} - -impl GarageRoot { - pub fn new() -> anyhow::Result<Self> { - let connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots()? - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - let k2v_http = HttpClient::builder(TokioExecutor::new()).build(connector); - let aws_http = HyperClientBuilder::new().build_https(); - Ok(Self { k2v_http, aws_http }) - } - - pub fn user(&self, conf: GarageConf) -> anyhow::Result<Arc<GarageUser>> { - let mut unicity: Vec<u8> = vec![]; - unicity.extend_from_slice(file!().as_bytes()); - unicity.append(&mut rmp_serde::to_vec(&conf)?); - - Ok(Arc::new(GarageUser { - conf, - aws_http: self.aws_http.clone(), - k2v_http: self.k2v_http.clone(), - unicity, - })) - } -} - -#[derive(Clone, Debug, Serialize)] -pub struct GarageConf { - pub region: String, - pub s3_endpoint: String, - pub k2v_endpoint: String, - pub aws_access_key_id: String, - pub aws_secret_access_key: String, - pub bucket: String, -} - -//@FIXME we should get rid of this builder -//and allocate a S3 + K2V client only once per user -//(and using a shared HTTP client) -#[derive(Clone, Debug)] -pub struct GarageUser { - conf: GarageConf, - aws_http: SharedHttpClient, - k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>, - unicity: Vec<u8>, -} - -#[async_trait] -impl IBuilder for GarageUser { - async fn build(&self) -> Result<Store, StorageError> { - let s3_creds = s3::config::Credentials::new( - self.conf.aws_access_key_id.clone(), - self.conf.aws_secret_access_key.clone(), - None, - None, - "aerogramme", - ); - - let sdk_config = aws_config::from_env() - .region(aws_config::Region::new(self.conf.region.clone())) - .credentials_provider(s3_creds) - .http_client(self.aws_http.clone()) - .endpoint_url(self.conf.s3_endpoint.clone()) - .load() - .await; - - let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) - .force_path_style(true) - .build(); - - let s3_client = aws_sdk_s3::Client::from_conf(s3_config); - - let k2v_config = k2v_client::K2vClientConfig { - endpoint: self.conf.k2v_endpoint.clone(), - region: self.conf.region.clone(), - aws_access_key_id: self.conf.aws_access_key_id.clone(), - aws_secret_access_key: self.conf.aws_secret_access_key.clone(), - bucket: self.conf.bucket.clone(), - user_agent: None, - }; - - let k2v_client = - match k2v_client::K2vClient::new_with_client(k2v_config, self.k2v_http.clone()) { - Err(e) => { - tracing::error!("unable to build k2v client: {}", e); - return Err(StorageError::Internal); - } - Ok(v) => v, - }; - - Ok(Box::new(GarageStore { - bucket: self.conf.bucket.clone(), - s3: s3_client, - k2v: k2v_client, - })) - } - fn unique(&self) -> UnicityBuffer { - UnicityBuffer(self.unicity.clone()) - } -} - -pub struct GarageStore { - bucket: String, - s3: s3::Client, - k2v: k2v_client::K2vClient, -} - -fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal { - let new_row_ref = row_ref.with_causality(causal_value.causality.into()); - let row_values = causal_value - .value - .into_iter() - .map(|k2v_value| match k2v_value { - k2v_client::K2vValue::Tombstone => Alternative::Tombstone, - k2v_client::K2vValue::Value(v) => Alternative::Value(v), - }) - .collect::<Vec<_>>(); - - RowVal { - row_ref: new_row_ref, - value: row_values, - } -} - -#[async_trait] -impl IStore for GarageStore { - async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { - tracing::trace!(select=%select, command="row_fetch"); - let (pk_list, batch_op) = match select { - Selector::Range { - shard, - sort_begin, - sort_end, - } => ( - vec![shard.to_string()], - vec![k2v_client::BatchReadOp { - partition_key: shard, - filter: k2v_client::Filter { - start: Some(sort_begin), - end: Some(sort_end), - ..k2v_client::Filter::default() - }, - ..k2v_client::BatchReadOp::default() - }], - ), - Selector::List(row_ref_list) => ( - row_ref_list - .iter() - .map(|row_ref| row_ref.uid.shard.to_string()) - .collect::<Vec<_>>(), - row_ref_list - .iter() - .map(|row_ref| k2v_client::BatchReadOp { - partition_key: &row_ref.uid.shard, - filter: k2v_client::Filter { - start: Some(&row_ref.uid.sort), - ..k2v_client::Filter::default() - }, - single_item: true, - ..k2v_client::BatchReadOp::default() - }) - .collect::<Vec<_>>(), - ), - Selector::Prefix { shard, sort_prefix } => ( - vec![shard.to_string()], - vec![k2v_client::BatchReadOp { - partition_key: shard, - filter: k2v_client::Filter { - prefix: Some(sort_prefix), - ..k2v_client::Filter::default() - }, - ..k2v_client::BatchReadOp::default() - }], - ), - Selector::Single(row_ref) => { - let causal_value = match self - .k2v - .read_item(&row_ref.uid.shard, &row_ref.uid.sort) - .await - { - Err(k2v_client::Error::NotFound) => { - tracing::debug!( - "K2V item not found shard={}, sort={}, bucket={}", - row_ref.uid.shard, - row_ref.uid.sort, - self.bucket, - ); - return Err(StorageError::NotFound); - } - Err(e) => { - tracing::error!( - "K2V read item shard={}, sort={}, bucket={} failed: {}", - row_ref.uid.shard, - row_ref.uid.sort, - self.bucket, - e - ); - return Err(StorageError::Internal); - } - Ok(v) => v, - }; - - let row_val = causal_to_row_val((*row_ref).clone(), causal_value); - return Ok(vec![row_val]); - } - }; - - let all_raw_res = match self.k2v.read_batch(&batch_op).await { - Err(e) => { - tracing::error!( - "k2v read batch failed for {:?}, bucket {} with err: {}", - select, - self.bucket, - e - ); - return Err(StorageError::Internal); - } - Ok(v) => v, - }; - //println!("fetch res -> {:?}", all_raw_res); - - let row_vals = - all_raw_res - .into_iter() - .zip(pk_list.into_iter()) - .fold(vec![], |mut acc, (page, pk)| { - page.items - .into_iter() - .map(|(sk, cv)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) - .for_each(|rr| acc.push(rr)); - - acc - }); - tracing::debug!(fetch_count = row_vals.len(), command = "row_fetch"); - - Ok(row_vals) - } - async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { - tracing::trace!(select=%select, command="row_rm"); - let del_op = match select { - Selector::Range { - shard, - sort_begin, - sort_end, - } => vec![k2v_client::BatchDeleteOp { - partition_key: shard, - prefix: None, - start: Some(sort_begin), - end: Some(sort_end), - single_item: false, - }], - Selector::List(row_ref_list) => { - // Insert null values with causality token = delete - let batch_op = row_ref_list - .iter() - .map(|v| k2v_client::BatchInsertOp { - partition_key: &v.uid.shard, - sort_key: &v.uid.sort, - causality: v.causality.clone().map(|ct| ct.into()), - value: k2v_client::K2vValue::Tombstone, - }) - .collect::<Vec<_>>(); - - return match self.k2v.insert_batch(&batch_op).await { - Err(e) => { - tracing::error!("Unable to delete the list of values: {}", e); - Err(StorageError::Internal) - } - Ok(_) => Ok(()), - }; - } - Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp { - partition_key: shard, - prefix: Some(sort_prefix), - start: None, - end: None, - single_item: false, - }], - Selector::Single(row_ref) => { - // Insert null values with causality token = delete - let batch_op = vec![k2v_client::BatchInsertOp { - partition_key: &row_ref.uid.shard, - sort_key: &row_ref.uid.sort, - causality: row_ref.causality.clone().map(|ct| ct.into()), - value: k2v_client::K2vValue::Tombstone, - }]; - - return match self.k2v.insert_batch(&batch_op).await { - Err(e) => { - tracing::error!("Unable to delete the list of values: {}", e); - Err(StorageError::Internal) - } - Ok(_) => Ok(()), - }; - } - }; - - // Finally here we only have prefix & range - match self.k2v.delete_batch(&del_op).await { - Err(e) => { - tracing::error!("delete batch error: {}", e); - Err(StorageError::Internal) - } - Ok(_) => Ok(()), - } - } - - async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { - tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert"); - let batch_ops = values - .iter() - .map(|v| k2v_client::BatchInsertOp { - partition_key: &v.row_ref.uid.shard, - sort_key: &v.row_ref.uid.sort, - causality: v.row_ref.causality.clone().map(|ct| ct.into()), - value: v - .value - .iter() - .next() - .map(|cv| match cv { - Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), - Alternative::Tombstone => k2v_client::K2vValue::Tombstone, - }) - .unwrap_or(k2v_client::K2vValue::Tombstone), - }) - .collect::<Vec<_>>(); - - match self.k2v.insert_batch(&batch_ops).await { - Err(e) => { - tracing::error!("k2v can't insert some value: {}", e); - Err(StorageError::Internal) - } - Ok(v) => Ok(v), - } - } - async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { - tracing::trace!(entry=%value, command="row_poll"); - loop { - if let Some(ct) = &value.causality { - match self - .k2v - .poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None) - .await - { - Err(e) => { - tracing::error!("Unable to poll item: {}", e); - return Err(StorageError::Internal); - } - Ok(None) => continue, - Ok(Some(cv)) => return Ok(causal_to_row_val(value.clone(), cv)), - } - } else { - match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await { - Err(k2v_client::Error::NotFound) => { - self.k2v - .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None) - .await - .map_err(|e| { - tracing::error!("Unable to insert item in polling logic: {}", e); - StorageError::Internal - })?; - } - Err(e) => { - tracing::error!("Unable to read item in polling logic: {}", e); - return Err(StorageError::Internal); - } - Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)), - } - } - } - } - - async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { - tracing::trace!(entry=%blob_ref, command="blob_fetch"); - let maybe_out = self - .s3 - .get_object() - .bucket(self.bucket.to_string()) - .key(blob_ref.0.to_string()) - .send() - .await; - - let object_output = match maybe_out { - Ok(output) => output, - Err(SdkError::ServiceError(x)) => match x.err() { - GetObjectError::NoSuchKey(_) => return Err(StorageError::NotFound), - e => { - tracing::warn!("Blob Fetch Error, Service Error: {}", e); - return Err(StorageError::Internal); - } - }, - Err(e) => { - tracing::warn!("Blob Fetch Error, {}", e); - return Err(StorageError::Internal); - } - }; - - let buffer = match object_output.body.collect().await { - Ok(aggreg) => aggreg.to_vec(), - Err(e) => { - tracing::warn!("Fetching body failed with {}", e); - return Err(StorageError::Internal); - } - }; - - let mut bv = BlobVal::new(blob_ref.clone(), buffer); - if let Some(meta) = object_output.metadata { - bv.meta = meta; - } - tracing::debug!("Fetched {}/{}", self.bucket, blob_ref.0); - Ok(bv) - } - async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { - tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); - let streamable_value = s3::primitives::ByteStream::from(blob_val.value); - - let maybe_send = self - .s3 - .put_object() - .bucket(self.bucket.to_string()) - .key(blob_val.blob_ref.0.to_string()) - .set_metadata(Some(blob_val.meta)) - .body(streamable_value) - .send() - .await; - - match maybe_send { - Err(e) => { - tracing::error!("unable to send object: {}", e); - Err(StorageError::Internal) - } - Ok(_) => { - tracing::debug!("Inserted {}/{}", self.bucket, blob_val.blob_ref.0); - Ok(()) - } - } - } - async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { - tracing::trace!(src=%src, dst=%dst, command="blob_copy"); - let maybe_copy = self - .s3 - .copy_object() - .bucket(self.bucket.to_string()) - .key(dst.0.clone()) - .copy_source(format!("/{}/{}", self.bucket.to_string(), src.0.clone())) - .send() - .await; - - match maybe_copy { - Err(e) => { - tracing::error!( - "unable to copy object {} to {} (bucket: {}), error: {}", - src.0, - dst.0, - self.bucket, - e - ); - Err(StorageError::Internal) - } - Ok(_) => { - tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket); - Ok(()) - } - } - } - async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { - tracing::trace!(prefix = prefix, command = "blob_list"); - let maybe_list = self - .s3 - .list_objects_v2() - .bucket(self.bucket.to_string()) - .prefix(prefix) - .into_paginator() - .send() - .try_collect() - .await; - - match maybe_list { - Err(e) => { - tracing::error!( - "listing prefix {} on bucket {} failed: {}", - prefix, - self.bucket, - e - ); - Err(StorageError::Internal) - } - Ok(pagin_list_out) => Ok(pagin_list_out - .into_iter() - .map(|list_out| list_out.contents.unwrap_or(vec![])) - .flatten() - .map(|obj| BlobRef(obj.key.unwrap_or(String::new()))) - .collect::<Vec<_>>()), - } - } - async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { - tracing::trace!(entry=%blob_ref, command="blob_rm"); - let maybe_delete = self - .s3 - .delete_object() - .bucket(self.bucket.to_string()) - .key(blob_ref.0.clone()) - .send() - .await; - - match maybe_delete { - Err(e) => { - tracing::error!( - "unable to delete {} (bucket: {}), error {}", - blob_ref.0, - self.bucket, - e - ); - Err(StorageError::Internal) - } - Ok(_) => { - tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket); - Ok(()) - } - } - } -} diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs deleted file mode 100644 index 3c3a94c..0000000 --- a/src/storage/in_memory.rs +++ /dev/null @@ -1,334 +0,0 @@ -use crate::storage::*; -use std::collections::{BTreeMap, HashMap}; -use std::ops::Bound::{self, Excluded, Included, Unbounded}; -use std::sync::{Arc, RwLock}; -use tokio::sync::Notify; - -/// This implementation is very inneficient, and not completely correct -/// Indeed, when the connector is dropped, the memory is freed. -/// It means that when a user disconnects, its data are lost. -/// It's intended only for basic debugging, do not use it for advanced tests... - -#[derive(Debug, Default)] -pub struct MemDb(tokio::sync::Mutex<HashMap<String, Arc<MemBuilder>>>); -impl MemDb { - pub fn new() -> Self { - Self(tokio::sync::Mutex::new(HashMap::new())) - } - - pub async fn builder(&self, username: &str) -> Arc<MemBuilder> { - let mut global_storage = self.0.lock().await; - global_storage - .entry(username.to_string()) - .or_insert(MemBuilder::new(username)) - .clone() - } -} - -#[derive(Debug, Clone)] -enum InternalData { - Tombstone, - Value(Vec<u8>), -} -impl InternalData { - fn to_alternative(&self) -> Alternative { - match self { - Self::Tombstone => Alternative::Tombstone, - Self::Value(x) => Alternative::Value(x.clone()), - } - } -} - -#[derive(Debug)] -struct InternalRowVal { - data: Vec<InternalData>, - version: u64, - change: Arc<Notify>, -} -impl std::default::Default for InternalRowVal { - fn default() -> Self { - Self { - data: vec![], - version: 1, - change: Arc::new(Notify::new()), - } - } -} -impl InternalRowVal { - fn concurrent_values(&self) -> Vec<Alternative> { - self.data.iter().map(InternalData::to_alternative).collect() - } - - fn to_row_val(&self, row_ref: RowRef) -> RowVal { - RowVal { - row_ref: row_ref.with_causality(self.version.to_string()), - value: self.concurrent_values(), - } - } -} - -#[derive(Debug, Default, Clone)] -struct InternalBlobVal { - data: Vec<u8>, - metadata: HashMap<String, String>, -} -impl InternalBlobVal { - fn to_blob_val(&self, bref: &BlobRef) -> BlobVal { - BlobVal { - blob_ref: bref.clone(), - meta: self.metadata.clone(), - value: self.data.clone(), - } - } -} - -type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, InternalRowVal>>>>; -type ArcBlob = Arc<RwLock<BTreeMap<String, InternalBlobVal>>>; - -#[derive(Clone, Debug)] -pub struct MemBuilder { - unicity: Vec<u8>, - row: ArcRow, - blob: ArcBlob, -} - -impl MemBuilder { - pub fn new(user: &str) -> Arc<Self> { - tracing::debug!("initialize membuilder for {}", user); - let mut unicity: Vec<u8> = vec![]; - unicity.extend_from_slice(file!().as_bytes()); - unicity.extend_from_slice(user.as_bytes()); - Arc::new(Self { - unicity, - row: Arc::new(RwLock::new(HashMap::new())), - blob: Arc::new(RwLock::new(BTreeMap::new())), - }) - } -} - -#[async_trait] -impl IBuilder for MemBuilder { - async fn build(&self) -> Result<Store, StorageError> { - Ok(Box::new(MemStore { - row: self.row.clone(), - blob: self.blob.clone(), - })) - } - - fn unique(&self) -> UnicityBuffer { - UnicityBuffer(self.unicity.clone()) - } -} - -pub struct MemStore { - row: ArcRow, - blob: ArcBlob, -} - -fn prefix_last_bound(prefix: &str) -> Bound<String> { - let mut sort_end = prefix.to_string(); - match sort_end.pop() { - None => Unbounded, - Some(ch) => { - let nc = char::from_u32(ch as u32 + 1).unwrap(); - sort_end.push(nc); - Excluded(sort_end) - } - } -} - -impl MemStore { - fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { - tracing::trace!(entry=%entry, command="row_rm_single"); - let mut store = self.row.write().or(Err(StorageError::Internal))?; - let shard = &entry.uid.shard; - let sort = &entry.uid.sort; - - let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) { - Some(Ok(v)) => v, - _ => 0, - }; - - let bt = store.entry(shard.to_string()).or_default(); - let intval = bt.entry(sort.to_string()).or_default(); - - if cauz == intval.version { - intval.data.clear(); - } - intval.data.push(InternalData::Tombstone); - intval.version += 1; - intval.change.notify_waiters(); - - Ok(()) - } -} - -#[async_trait] -impl IStore for MemStore { - async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { - tracing::trace!(select=%select, command="row_fetch"); - let store = self.row.read().or(Err(StorageError::Internal))?; - - match select { - Selector::Range { - shard, - sort_begin, - sort_end, - } => Ok(store - .get(*shard) - .unwrap_or(&BTreeMap::new()) - .range(( - Included(sort_begin.to_string()), - Excluded(sort_end.to_string()), - )) - .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) - .collect::<Vec<_>>()), - Selector::List(rlist) => { - let mut acc = vec![]; - for row_ref in rlist { - let maybe_intval = store - .get(&row_ref.uid.shard) - .map(|v| v.get(&row_ref.uid.sort)) - .flatten(); - if let Some(intval) = maybe_intval { - acc.push(intval.to_row_val(row_ref.clone())); - } - } - Ok(acc) - } - Selector::Prefix { shard, sort_prefix } => { - let last_bound = prefix_last_bound(sort_prefix); - - Ok(store - .get(*shard) - .unwrap_or(&BTreeMap::new()) - .range((Included(sort_prefix.to_string()), last_bound)) - .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) - .collect::<Vec<_>>()) - } - Selector::Single(row_ref) => { - let intval = store - .get(&row_ref.uid.shard) - .ok_or(StorageError::NotFound)? - .get(&row_ref.uid.sort) - .ok_or(StorageError::NotFound)?; - Ok(vec![intval.to_row_val((*row_ref).clone())]) - } - } - } - - async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { - tracing::trace!(select=%select, command="row_rm"); - - let values = match select { - Selector::Range { .. } | Selector::Prefix { .. } => self - .row_fetch(select) - .await? - .into_iter() - .map(|rv| rv.row_ref) - .collect::<Vec<_>>(), - Selector::List(rlist) => rlist.clone(), - Selector::Single(row_ref) => vec![(*row_ref).clone()], - }; - - for v in values.into_iter() { - self.row_rm_single(&v)?; - } - Ok(()) - } - - async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { - tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert"); - let mut store = self.row.write().or(Err(StorageError::Internal))?; - for v in values.into_iter() { - let shard = v.row_ref.uid.shard; - let sort = v.row_ref.uid.sort; - - let val = match v.value.into_iter().next() { - Some(Alternative::Value(x)) => x, - _ => vec![], - }; - - let cauz = match v.row_ref.causality.map(|v| v.parse::<u64>()) { - Some(Ok(v)) => v, - _ => 0, - }; - - let bt = store.entry(shard).or_default(); - let intval = bt.entry(sort).or_default(); - - if cauz == intval.version { - intval.data.clear(); - } - intval.data.push(InternalData::Value(val)); - intval.version += 1; - intval.change.notify_waiters(); - } - Ok(()) - } - async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { - tracing::trace!(entry=%value, command="row_poll"); - let shard = &value.uid.shard; - let sort = &value.uid.sort; - let cauz = match value.causality.as_ref().map(|v| v.parse::<u64>()) { - Some(Ok(v)) => v, - _ => 0, - }; - - let notify_me = { - let mut store = self.row.write().or(Err(StorageError::Internal))?; - let bt = store.entry(shard.to_string()).or_default(); - let intval = bt.entry(sort.to_string()).or_default(); - - if intval.version != cauz { - return Ok(intval.to_row_val(value.clone())); - } - intval.change.clone() - }; - - notify_me.notified().await; - - let res = self.row_fetch(&Selector::Single(value)).await?; - res.into_iter().next().ok_or(StorageError::NotFound) - } - - async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { - tracing::trace!(entry=%blob_ref, command="blob_fetch"); - let store = self.blob.read().or(Err(StorageError::Internal))?; - store - .get(&blob_ref.0) - .ok_or(StorageError::NotFound) - .map(|v| v.to_blob_val(blob_ref)) - } - async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { - tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); - let mut store = self.blob.write().or(Err(StorageError::Internal))?; - let entry = store.entry(blob_val.blob_ref.0.clone()).or_default(); - entry.data = blob_val.value.clone(); - entry.metadata = blob_val.meta.clone(); - Ok(()) - } - async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { - tracing::trace!(src=%src, dst=%dst, command="blob_copy"); - let mut store = self.blob.write().or(Err(StorageError::Internal))?; - let blob_src = store.entry(src.0.clone()).or_default().clone(); - store.insert(dst.0.clone(), blob_src); - Ok(()) - } - async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { - tracing::trace!(prefix = prefix, command = "blob_list"); - let store = self.blob.read().or(Err(StorageError::Internal))?; - let last_bound = prefix_last_bound(prefix); - let blist = store - .range((Included(prefix.to_string()), last_bound)) - .map(|(k, _)| BlobRef(k.to_string())) - .collect::<Vec<_>>(); - Ok(blist) - } - async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { - tracing::trace!(entry=%blob_ref, command="blob_rm"); - let mut store = self.blob.write().or(Err(StorageError::Internal))?; - store.remove(&blob_ref.0); - Ok(()) - } -} diff --git a/src/storage/mod.rs b/src/storage/mod.rs deleted file mode 100644 index 1f86f71..0000000 --- a/src/storage/mod.rs +++ /dev/null @@ -1,179 +0,0 @@ -/* - * - * This abstraction goal is to leverage all the semantic of Garage K2V+S3, - * to be as tailored as possible to it ; it aims to be a zero-cost abstraction - * compared to when we where directly using the K2V+S3 client. - * - * My idea: we can encapsulate the causality token - * into the object system so it is not exposed. - */ - -pub mod garage; -pub mod in_memory; - -use async_trait::async_trait; -use std::collections::HashMap; -use std::hash::Hash; -use std::sync::Arc; - -#[derive(Debug, Clone)] -pub enum Alternative { - Tombstone, - Value(Vec<u8>), -} -type ConcurrentValues = Vec<Alternative>; - -#[derive(Debug, Clone)] -pub enum StorageError { - NotFound, - Internal, -} -impl std::fmt::Display for StorageError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("Storage Error: ")?; - match self { - Self::NotFound => f.write_str("Item not found"), - Self::Internal => f.write_str("An internal error occured"), - } - } -} -impl std::error::Error for StorageError {} - -#[derive(Debug, Clone, PartialEq)] -pub struct RowUid { - pub shard: String, - pub sort: String, -} - -#[derive(Debug, Clone, PartialEq)] -pub struct RowRef { - pub uid: RowUid, - pub causality: Option<String>, -} -impl std::fmt::Display for RowRef { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "RowRef({}, {}, {:?})", - self.uid.shard, self.uid.sort, self.causality - ) - } -} - -impl RowRef { - pub fn new(shard: &str, sort: &str) -> Self { - Self { - uid: RowUid { - shard: shard.to_string(), - sort: sort.to_string(), - }, - causality: None, - } - } - pub fn with_causality(mut self, causality: String) -> Self { - self.causality = Some(causality); - self - } -} - -#[derive(Debug, Clone)] -pub struct RowVal { - pub row_ref: RowRef, - pub value: ConcurrentValues, -} - -impl RowVal { - pub fn new(row_ref: RowRef, value: Vec<u8>) -> Self { - Self { - row_ref, - value: vec![Alternative::Value(value)], - } - } -} - -#[derive(Debug, Clone)] -pub struct BlobRef(pub String); -impl std::fmt::Display for BlobRef { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BlobRef({})", self.0) - } -} - -#[derive(Debug, Clone)] -pub struct BlobVal { - pub blob_ref: BlobRef, - pub meta: HashMap<String, String>, - pub value: Vec<u8>, -} -impl BlobVal { - pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self { - Self { - blob_ref, - value, - meta: HashMap::new(), - } - } - - pub fn with_meta(mut self, k: String, v: String) -> Self { - self.meta.insert(k, v); - self - } -} - -#[derive(Debug)] -pub enum Selector<'a> { - Range { - shard: &'a str, - sort_begin: &'a str, - sort_end: &'a str, - }, - List(Vec<RowRef>), // list of (shard_key, sort_key) - #[allow(dead_code)] - Prefix { - shard: &'a str, - sort_prefix: &'a str, - }, - Single(&'a RowRef), -} -impl<'a> std::fmt::Display for Selector<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Range { - shard, - sort_begin, - sort_end, - } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end), - Self::List(list) => write!(f, "List({:?})", list), - Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix), - Self::Single(row_ref) => write!(f, "Single({})", row_ref), - } - } -} - -#[async_trait] -pub trait IStore { - async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>; - async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>; - async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>; - async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>; - - async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>; - async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError>; - async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>; - async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>; - async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; -} - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct UnicityBuffer(Vec<u8>); - -#[async_trait] -pub trait IBuilder: std::fmt::Debug { - async fn build(&self) -> Result<Store, StorageError>; - - /// Returns an opaque buffer that uniquely identifies this builder - fn unique(&self) -> UnicityBuffer; -} - -pub type Builder = Arc<dyn IBuilder + Send + Sync>; -pub type Store = Box<dyn IStore + Send + Sync>; |