aboutsummaryrefslogtreecommitdiff
path: root/src/storage
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-03-08 08:17:03 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-03-08 08:17:03 +0100
commit1a43ce5ac7033c148f64a033f2b1d335e95e11d5 (patch)
tree60b234604170fe207248458a9c4cdd3f4b7c36f2 /src/storage
parentbb9cb386b65834c44cae86bd100f800883022062 (diff)
downloadaerogramme-1a43ce5ac7033c148f64a033f2b1d335e95e11d5.tar.gz
aerogramme-1a43ce5ac7033c148f64a033f2b1d335e95e11d5.zip
WIP refactor
Diffstat (limited to 'src/storage')
-rw-r--r--src/storage/garage.rs538
-rw-r--r--src/storage/in_memory.rs334
-rw-r--r--src/storage/mod.rs179
3 files changed, 0 insertions, 1051 deletions
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
deleted file mode 100644
index 7152764..0000000
--- a/src/storage/garage.rs
+++ /dev/null
@@ -1,538 +0,0 @@
-use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError};
-use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
-use aws_smithy_runtime_api::client::http::SharedHttpClient;
-use hyper_rustls::HttpsConnector;
-use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient};
-use hyper_util::rt::TokioExecutor;
-use serde::Serialize;
-
-use crate::storage::*;
-
-pub struct GarageRoot {
- k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>,
- aws_http: SharedHttpClient,
-}
-
-impl GarageRoot {
- pub fn new() -> anyhow::Result<Self> {
- let connector = hyper_rustls::HttpsConnectorBuilder::new()
- .with_native_roots()?
- .https_or_http()
- .enable_http1()
- .enable_http2()
- .build();
- let k2v_http = HttpClient::builder(TokioExecutor::new()).build(connector);
- let aws_http = HyperClientBuilder::new().build_https();
- Ok(Self { k2v_http, aws_http })
- }
-
- pub fn user(&self, conf: GarageConf) -> anyhow::Result<Arc<GarageUser>> {
- let mut unicity: Vec<u8> = vec![];
- unicity.extend_from_slice(file!().as_bytes());
- unicity.append(&mut rmp_serde::to_vec(&conf)?);
-
- Ok(Arc::new(GarageUser {
- conf,
- aws_http: self.aws_http.clone(),
- k2v_http: self.k2v_http.clone(),
- unicity,
- }))
- }
-}
-
-#[derive(Clone, Debug, Serialize)]
-pub struct GarageConf {
- pub region: String,
- pub s3_endpoint: String,
- pub k2v_endpoint: String,
- pub aws_access_key_id: String,
- pub aws_secret_access_key: String,
- pub bucket: String,
-}
-
-//@FIXME we should get rid of this builder
-//and allocate a S3 + K2V client only once per user
-//(and using a shared HTTP client)
-#[derive(Clone, Debug)]
-pub struct GarageUser {
- conf: GarageConf,
- aws_http: SharedHttpClient,
- k2v_http: HttpClient<HttpsConnector<HttpConnector>, k2v_client::Body>,
- unicity: Vec<u8>,
-}
-
-#[async_trait]
-impl IBuilder for GarageUser {
- async fn build(&self) -> Result<Store, StorageError> {
- let s3_creds = s3::config::Credentials::new(
- self.conf.aws_access_key_id.clone(),
- self.conf.aws_secret_access_key.clone(),
- None,
- None,
- "aerogramme",
- );
-
- let sdk_config = aws_config::from_env()
- .region(aws_config::Region::new(self.conf.region.clone()))
- .credentials_provider(s3_creds)
- .http_client(self.aws_http.clone())
- .endpoint_url(self.conf.s3_endpoint.clone())
- .load()
- .await;
-
- let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config)
- .force_path_style(true)
- .build();
-
- let s3_client = aws_sdk_s3::Client::from_conf(s3_config);
-
- let k2v_config = k2v_client::K2vClientConfig {
- endpoint: self.conf.k2v_endpoint.clone(),
- region: self.conf.region.clone(),
- aws_access_key_id: self.conf.aws_access_key_id.clone(),
- aws_secret_access_key: self.conf.aws_secret_access_key.clone(),
- bucket: self.conf.bucket.clone(),
- user_agent: None,
- };
-
- let k2v_client =
- match k2v_client::K2vClient::new_with_client(k2v_config, self.k2v_http.clone()) {
- Err(e) => {
- tracing::error!("unable to build k2v client: {}", e);
- return Err(StorageError::Internal);
- }
- Ok(v) => v,
- };
-
- Ok(Box::new(GarageStore {
- bucket: self.conf.bucket.clone(),
- s3: s3_client,
- k2v: k2v_client,
- }))
- }
- fn unique(&self) -> UnicityBuffer {
- UnicityBuffer(self.unicity.clone())
- }
-}
-
-pub struct GarageStore {
- bucket: String,
- s3: s3::Client,
- k2v: k2v_client::K2vClient,
-}
-
-fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal {
- let new_row_ref = row_ref.with_causality(causal_value.causality.into());
- let row_values = causal_value
- .value
- .into_iter()
- .map(|k2v_value| match k2v_value {
- k2v_client::K2vValue::Tombstone => Alternative::Tombstone,
- k2v_client::K2vValue::Value(v) => Alternative::Value(v),
- })
- .collect::<Vec<_>>();
-
- RowVal {
- row_ref: new_row_ref,
- value: row_values,
- }
-}
-
-#[async_trait]
-impl IStore for GarageStore {
- async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
- tracing::trace!(select=%select, command="row_fetch");
- let (pk_list, batch_op) = match select {
- Selector::Range {
- shard,
- sort_begin,
- sort_end,
- } => (
- vec![shard.to_string()],
- vec![k2v_client::BatchReadOp {
- partition_key: shard,
- filter: k2v_client::Filter {
- start: Some(sort_begin),
- end: Some(sort_end),
- ..k2v_client::Filter::default()
- },
- ..k2v_client::BatchReadOp::default()
- }],
- ),
- Selector::List(row_ref_list) => (
- row_ref_list
- .iter()
- .map(|row_ref| row_ref.uid.shard.to_string())
- .collect::<Vec<_>>(),
- row_ref_list
- .iter()
- .map(|row_ref| k2v_client::BatchReadOp {
- partition_key: &row_ref.uid.shard,
- filter: k2v_client::Filter {
- start: Some(&row_ref.uid.sort),
- ..k2v_client::Filter::default()
- },
- single_item: true,
- ..k2v_client::BatchReadOp::default()
- })
- .collect::<Vec<_>>(),
- ),
- Selector::Prefix { shard, sort_prefix } => (
- vec![shard.to_string()],
- vec![k2v_client::BatchReadOp {
- partition_key: shard,
- filter: k2v_client::Filter {
- prefix: Some(sort_prefix),
- ..k2v_client::Filter::default()
- },
- ..k2v_client::BatchReadOp::default()
- }],
- ),
- Selector::Single(row_ref) => {
- let causal_value = match self
- .k2v
- .read_item(&row_ref.uid.shard, &row_ref.uid.sort)
- .await
- {
- Err(k2v_client::Error::NotFound) => {
- tracing::debug!(
- "K2V item not found shard={}, sort={}, bucket={}",
- row_ref.uid.shard,
- row_ref.uid.sort,
- self.bucket,
- );
- return Err(StorageError::NotFound);
- }
- Err(e) => {
- tracing::error!(
- "K2V read item shard={}, sort={}, bucket={} failed: {}",
- row_ref.uid.shard,
- row_ref.uid.sort,
- self.bucket,
- e
- );
- return Err(StorageError::Internal);
- }
- Ok(v) => v,
- };
-
- let row_val = causal_to_row_val((*row_ref).clone(), causal_value);
- return Ok(vec![row_val]);
- }
- };
-
- let all_raw_res = match self.k2v.read_batch(&batch_op).await {
- Err(e) => {
- tracing::error!(
- "k2v read batch failed for {:?}, bucket {} with err: {}",
- select,
- self.bucket,
- e
- );
- return Err(StorageError::Internal);
- }
- Ok(v) => v,
- };
- //println!("fetch res -> {:?}", all_raw_res);
-
- let row_vals =
- all_raw_res
- .into_iter()
- .zip(pk_list.into_iter())
- .fold(vec![], |mut acc, (page, pk)| {
- page.items
- .into_iter()
- .map(|(sk, cv)| causal_to_row_val(RowRef::new(&pk, &sk), cv))
- .for_each(|rr| acc.push(rr));
-
- acc
- });
- tracing::debug!(fetch_count = row_vals.len(), command = "row_fetch");
-
- Ok(row_vals)
- }
- async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
- tracing::trace!(select=%select, command="row_rm");
- let del_op = match select {
- Selector::Range {
- shard,
- sort_begin,
- sort_end,
- } => vec![k2v_client::BatchDeleteOp {
- partition_key: shard,
- prefix: None,
- start: Some(sort_begin),
- end: Some(sort_end),
- single_item: false,
- }],
- Selector::List(row_ref_list) => {
- // Insert null values with causality token = delete
- let batch_op = row_ref_list
- .iter()
- .map(|v| k2v_client::BatchInsertOp {
- partition_key: &v.uid.shard,
- sort_key: &v.uid.sort,
- causality: v.causality.clone().map(|ct| ct.into()),
- value: k2v_client::K2vValue::Tombstone,
- })
- .collect::<Vec<_>>();
-
- return match self.k2v.insert_batch(&batch_op).await {
- Err(e) => {
- tracing::error!("Unable to delete the list of values: {}", e);
- Err(StorageError::Internal)
- }
- Ok(_) => Ok(()),
- };
- }
- Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp {
- partition_key: shard,
- prefix: Some(sort_prefix),
- start: None,
- end: None,
- single_item: false,
- }],
- Selector::Single(row_ref) => {
- // Insert null values with causality token = delete
- let batch_op = vec![k2v_client::BatchInsertOp {
- partition_key: &row_ref.uid.shard,
- sort_key: &row_ref.uid.sort,
- causality: row_ref.causality.clone().map(|ct| ct.into()),
- value: k2v_client::K2vValue::Tombstone,
- }];
-
- return match self.k2v.insert_batch(&batch_op).await {
- Err(e) => {
- tracing::error!("Unable to delete the list of values: {}", e);
- Err(StorageError::Internal)
- }
- Ok(_) => Ok(()),
- };
- }
- };
-
- // Finally here we only have prefix & range
- match self.k2v.delete_batch(&del_op).await {
- Err(e) => {
- tracing::error!("delete batch error: {}", e);
- Err(StorageError::Internal)
- }
- Ok(_) => Ok(()),
- }
- }
-
- async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
- tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert");
- let batch_ops = values
- .iter()
- .map(|v| k2v_client::BatchInsertOp {
- partition_key: &v.row_ref.uid.shard,
- sort_key: &v.row_ref.uid.sort,
- causality: v.row_ref.causality.clone().map(|ct| ct.into()),
- value: v
- .value
- .iter()
- .next()
- .map(|cv| match cv {
- Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()),
- Alternative::Tombstone => k2v_client::K2vValue::Tombstone,
- })
- .unwrap_or(k2v_client::K2vValue::Tombstone),
- })
- .collect::<Vec<_>>();
-
- match self.k2v.insert_batch(&batch_ops).await {
- Err(e) => {
- tracing::error!("k2v can't insert some value: {}", e);
- Err(StorageError::Internal)
- }
- Ok(v) => Ok(v),
- }
- }
- async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
- tracing::trace!(entry=%value, command="row_poll");
- loop {
- if let Some(ct) = &value.causality {
- match self
- .k2v
- .poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None)
- .await
- {
- Err(e) => {
- tracing::error!("Unable to poll item: {}", e);
- return Err(StorageError::Internal);
- }
- Ok(None) => continue,
- Ok(Some(cv)) => return Ok(causal_to_row_val(value.clone(), cv)),
- }
- } else {
- match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await {
- Err(k2v_client::Error::NotFound) => {
- self.k2v
- .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None)
- .await
- .map_err(|e| {
- tracing::error!("Unable to insert item in polling logic: {}", e);
- StorageError::Internal
- })?;
- }
- Err(e) => {
- tracing::error!("Unable to read item in polling logic: {}", e);
- return Err(StorageError::Internal);
- }
- Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)),
- }
- }
- }
- }
-
- async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
- tracing::trace!(entry=%blob_ref, command="blob_fetch");
- let maybe_out = self
- .s3
- .get_object()
- .bucket(self.bucket.to_string())
- .key(blob_ref.0.to_string())
- .send()
- .await;
-
- let object_output = match maybe_out {
- Ok(output) => output,
- Err(SdkError::ServiceError(x)) => match x.err() {
- GetObjectError::NoSuchKey(_) => return Err(StorageError::NotFound),
- e => {
- tracing::warn!("Blob Fetch Error, Service Error: {}", e);
- return Err(StorageError::Internal);
- }
- },
- Err(e) => {
- tracing::warn!("Blob Fetch Error, {}", e);
- return Err(StorageError::Internal);
- }
- };
-
- let buffer = match object_output.body.collect().await {
- Ok(aggreg) => aggreg.to_vec(),
- Err(e) => {
- tracing::warn!("Fetching body failed with {}", e);
- return Err(StorageError::Internal);
- }
- };
-
- let mut bv = BlobVal::new(blob_ref.clone(), buffer);
- if let Some(meta) = object_output.metadata {
- bv.meta = meta;
- }
- tracing::debug!("Fetched {}/{}", self.bucket, blob_ref.0);
- Ok(bv)
- }
- async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
- tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
- let streamable_value = s3::primitives::ByteStream::from(blob_val.value);
-
- let maybe_send = self
- .s3
- .put_object()
- .bucket(self.bucket.to_string())
- .key(blob_val.blob_ref.0.to_string())
- .set_metadata(Some(blob_val.meta))
- .body(streamable_value)
- .send()
- .await;
-
- match maybe_send {
- Err(e) => {
- tracing::error!("unable to send object: {}", e);
- Err(StorageError::Internal)
- }
- Ok(_) => {
- tracing::debug!("Inserted {}/{}", self.bucket, blob_val.blob_ref.0);
- Ok(())
- }
- }
- }
- async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
- tracing::trace!(src=%src, dst=%dst, command="blob_copy");
- let maybe_copy = self
- .s3
- .copy_object()
- .bucket(self.bucket.to_string())
- .key(dst.0.clone())
- .copy_source(format!("/{}/{}", self.bucket.to_string(), src.0.clone()))
- .send()
- .await;
-
- match maybe_copy {
- Err(e) => {
- tracing::error!(
- "unable to copy object {} to {} (bucket: {}), error: {}",
- src.0,
- dst.0,
- self.bucket,
- e
- );
- Err(StorageError::Internal)
- }
- Ok(_) => {
- tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket);
- Ok(())
- }
- }
- }
- async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
- tracing::trace!(prefix = prefix, command = "blob_list");
- let maybe_list = self
- .s3
- .list_objects_v2()
- .bucket(self.bucket.to_string())
- .prefix(prefix)
- .into_paginator()
- .send()
- .try_collect()
- .await;
-
- match maybe_list {
- Err(e) => {
- tracing::error!(
- "listing prefix {} on bucket {} failed: {}",
- prefix,
- self.bucket,
- e
- );
- Err(StorageError::Internal)
- }
- Ok(pagin_list_out) => Ok(pagin_list_out
- .into_iter()
- .map(|list_out| list_out.contents.unwrap_or(vec![]))
- .flatten()
- .map(|obj| BlobRef(obj.key.unwrap_or(String::new())))
- .collect::<Vec<_>>()),
- }
- }
- async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
- tracing::trace!(entry=%blob_ref, command="blob_rm");
- let maybe_delete = self
- .s3
- .delete_object()
- .bucket(self.bucket.to_string())
- .key(blob_ref.0.clone())
- .send()
- .await;
-
- match maybe_delete {
- Err(e) => {
- tracing::error!(
- "unable to delete {} (bucket: {}), error {}",
- blob_ref.0,
- self.bucket,
- e
- );
- Err(StorageError::Internal)
- }
- Ok(_) => {
- tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket);
- Ok(())
- }
- }
- }
-}
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
deleted file mode 100644
index 3c3a94c..0000000
--- a/src/storage/in_memory.rs
+++ /dev/null
@@ -1,334 +0,0 @@
-use crate::storage::*;
-use std::collections::{BTreeMap, HashMap};
-use std::ops::Bound::{self, Excluded, Included, Unbounded};
-use std::sync::{Arc, RwLock};
-use tokio::sync::Notify;
-
-/// This implementation is very inneficient, and not completely correct
-/// Indeed, when the connector is dropped, the memory is freed.
-/// It means that when a user disconnects, its data are lost.
-/// It's intended only for basic debugging, do not use it for advanced tests...
-
-#[derive(Debug, Default)]
-pub struct MemDb(tokio::sync::Mutex<HashMap<String, Arc<MemBuilder>>>);
-impl MemDb {
- pub fn new() -> Self {
- Self(tokio::sync::Mutex::new(HashMap::new()))
- }
-
- pub async fn builder(&self, username: &str) -> Arc<MemBuilder> {
- let mut global_storage = self.0.lock().await;
- global_storage
- .entry(username.to_string())
- .or_insert(MemBuilder::new(username))
- .clone()
- }
-}
-
-#[derive(Debug, Clone)]
-enum InternalData {
- Tombstone,
- Value(Vec<u8>),
-}
-impl InternalData {
- fn to_alternative(&self) -> Alternative {
- match self {
- Self::Tombstone => Alternative::Tombstone,
- Self::Value(x) => Alternative::Value(x.clone()),
- }
- }
-}
-
-#[derive(Debug)]
-struct InternalRowVal {
- data: Vec<InternalData>,
- version: u64,
- change: Arc<Notify>,
-}
-impl std::default::Default for InternalRowVal {
- fn default() -> Self {
- Self {
- data: vec![],
- version: 1,
- change: Arc::new(Notify::new()),
- }
- }
-}
-impl InternalRowVal {
- fn concurrent_values(&self) -> Vec<Alternative> {
- self.data.iter().map(InternalData::to_alternative).collect()
- }
-
- fn to_row_val(&self, row_ref: RowRef) -> RowVal {
- RowVal {
- row_ref: row_ref.with_causality(self.version.to_string()),
- value: self.concurrent_values(),
- }
- }
-}
-
-#[derive(Debug, Default, Clone)]
-struct InternalBlobVal {
- data: Vec<u8>,
- metadata: HashMap<String, String>,
-}
-impl InternalBlobVal {
- fn to_blob_val(&self, bref: &BlobRef) -> BlobVal {
- BlobVal {
- blob_ref: bref.clone(),
- meta: self.metadata.clone(),
- value: self.data.clone(),
- }
- }
-}
-
-type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, InternalRowVal>>>>;
-type ArcBlob = Arc<RwLock<BTreeMap<String, InternalBlobVal>>>;
-
-#[derive(Clone, Debug)]
-pub struct MemBuilder {
- unicity: Vec<u8>,
- row: ArcRow,
- blob: ArcBlob,
-}
-
-impl MemBuilder {
- pub fn new(user: &str) -> Arc<Self> {
- tracing::debug!("initialize membuilder for {}", user);
- let mut unicity: Vec<u8> = vec![];
- unicity.extend_from_slice(file!().as_bytes());
- unicity.extend_from_slice(user.as_bytes());
- Arc::new(Self {
- unicity,
- row: Arc::new(RwLock::new(HashMap::new())),
- blob: Arc::new(RwLock::new(BTreeMap::new())),
- })
- }
-}
-
-#[async_trait]
-impl IBuilder for MemBuilder {
- async fn build(&self) -> Result<Store, StorageError> {
- Ok(Box::new(MemStore {
- row: self.row.clone(),
- blob: self.blob.clone(),
- }))
- }
-
- fn unique(&self) -> UnicityBuffer {
- UnicityBuffer(self.unicity.clone())
- }
-}
-
-pub struct MemStore {
- row: ArcRow,
- blob: ArcBlob,
-}
-
-fn prefix_last_bound(prefix: &str) -> Bound<String> {
- let mut sort_end = prefix.to_string();
- match sort_end.pop() {
- None => Unbounded,
- Some(ch) => {
- let nc = char::from_u32(ch as u32 + 1).unwrap();
- sort_end.push(nc);
- Excluded(sort_end)
- }
- }
-}
-
-impl MemStore {
- fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
- tracing::trace!(entry=%entry, command="row_rm_single");
- let mut store = self.row.write().or(Err(StorageError::Internal))?;
- let shard = &entry.uid.shard;
- let sort = &entry.uid.sort;
-
- let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) {
- Some(Ok(v)) => v,
- _ => 0,
- };
-
- let bt = store.entry(shard.to_string()).or_default();
- let intval = bt.entry(sort.to_string()).or_default();
-
- if cauz == intval.version {
- intval.data.clear();
- }
- intval.data.push(InternalData::Tombstone);
- intval.version += 1;
- intval.change.notify_waiters();
-
- Ok(())
- }
-}
-
-#[async_trait]
-impl IStore for MemStore {
- async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
- tracing::trace!(select=%select, command="row_fetch");
- let store = self.row.read().or(Err(StorageError::Internal))?;
-
- match select {
- Selector::Range {
- shard,
- sort_begin,
- sort_end,
- } => Ok(store
- .get(*shard)
- .unwrap_or(&BTreeMap::new())
- .range((
- Included(sort_begin.to_string()),
- Excluded(sort_end.to_string()),
- ))
- .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
- .collect::<Vec<_>>()),
- Selector::List(rlist) => {
- let mut acc = vec![];
- for row_ref in rlist {
- let maybe_intval = store
- .get(&row_ref.uid.shard)
- .map(|v| v.get(&row_ref.uid.sort))
- .flatten();
- if let Some(intval) = maybe_intval {
- acc.push(intval.to_row_val(row_ref.clone()));
- }
- }
- Ok(acc)
- }
- Selector::Prefix { shard, sort_prefix } => {
- let last_bound = prefix_last_bound(sort_prefix);
-
- Ok(store
- .get(*shard)
- .unwrap_or(&BTreeMap::new())
- .range((Included(sort_prefix.to_string()), last_bound))
- .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
- .collect::<Vec<_>>())
- }
- Selector::Single(row_ref) => {
- let intval = store
- .get(&row_ref.uid.shard)
- .ok_or(StorageError::NotFound)?
- .get(&row_ref.uid.sort)
- .ok_or(StorageError::NotFound)?;
- Ok(vec![intval.to_row_val((*row_ref).clone())])
- }
- }
- }
-
- async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
- tracing::trace!(select=%select, command="row_rm");
-
- let values = match select {
- Selector::Range { .. } | Selector::Prefix { .. } => self
- .row_fetch(select)
- .await?
- .into_iter()
- .map(|rv| rv.row_ref)
- .collect::<Vec<_>>(),
- Selector::List(rlist) => rlist.clone(),
- Selector::Single(row_ref) => vec![(*row_ref).clone()],
- };
-
- for v in values.into_iter() {
- self.row_rm_single(&v)?;
- }
- Ok(())
- }
-
- async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
- tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert");
- let mut store = self.row.write().or(Err(StorageError::Internal))?;
- for v in values.into_iter() {
- let shard = v.row_ref.uid.shard;
- let sort = v.row_ref.uid.sort;
-
- let val = match v.value.into_iter().next() {
- Some(Alternative::Value(x)) => x,
- _ => vec![],
- };
-
- let cauz = match v.row_ref.causality.map(|v| v.parse::<u64>()) {
- Some(Ok(v)) => v,
- _ => 0,
- };
-
- let bt = store.entry(shard).or_default();
- let intval = bt.entry(sort).or_default();
-
- if cauz == intval.version {
- intval.data.clear();
- }
- intval.data.push(InternalData::Value(val));
- intval.version += 1;
- intval.change.notify_waiters();
- }
- Ok(())
- }
- async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
- tracing::trace!(entry=%value, command="row_poll");
- let shard = &value.uid.shard;
- let sort = &value.uid.sort;
- let cauz = match value.causality.as_ref().map(|v| v.parse::<u64>()) {
- Some(Ok(v)) => v,
- _ => 0,
- };
-
- let notify_me = {
- let mut store = self.row.write().or(Err(StorageError::Internal))?;
- let bt = store.entry(shard.to_string()).or_default();
- let intval = bt.entry(sort.to_string()).or_default();
-
- if intval.version != cauz {
- return Ok(intval.to_row_val(value.clone()));
- }
- intval.change.clone()
- };
-
- notify_me.notified().await;
-
- let res = self.row_fetch(&Selector::Single(value)).await?;
- res.into_iter().next().ok_or(StorageError::NotFound)
- }
-
- async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
- tracing::trace!(entry=%blob_ref, command="blob_fetch");
- let store = self.blob.read().or(Err(StorageError::Internal))?;
- store
- .get(&blob_ref.0)
- .ok_or(StorageError::NotFound)
- .map(|v| v.to_blob_val(blob_ref))
- }
- async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> {
- tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
- let mut store = self.blob.write().or(Err(StorageError::Internal))?;
- let entry = store.entry(blob_val.blob_ref.0.clone()).or_default();
- entry.data = blob_val.value.clone();
- entry.metadata = blob_val.meta.clone();
- Ok(())
- }
- async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
- tracing::trace!(src=%src, dst=%dst, command="blob_copy");
- let mut store = self.blob.write().or(Err(StorageError::Internal))?;
- let blob_src = store.entry(src.0.clone()).or_default().clone();
- store.insert(dst.0.clone(), blob_src);
- Ok(())
- }
- async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
- tracing::trace!(prefix = prefix, command = "blob_list");
- let store = self.blob.read().or(Err(StorageError::Internal))?;
- let last_bound = prefix_last_bound(prefix);
- let blist = store
- .range((Included(prefix.to_string()), last_bound))
- .map(|(k, _)| BlobRef(k.to_string()))
- .collect::<Vec<_>>();
- Ok(blist)
- }
- async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
- tracing::trace!(entry=%blob_ref, command="blob_rm");
- let mut store = self.blob.write().or(Err(StorageError::Internal))?;
- store.remove(&blob_ref.0);
- Ok(())
- }
-}
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
deleted file mode 100644
index 1f86f71..0000000
--- a/src/storage/mod.rs
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- *
- * This abstraction goal is to leverage all the semantic of Garage K2V+S3,
- * to be as tailored as possible to it ; it aims to be a zero-cost abstraction
- * compared to when we where directly using the K2V+S3 client.
- *
- * My idea: we can encapsulate the causality token
- * into the object system so it is not exposed.
- */
-
-pub mod garage;
-pub mod in_memory;
-
-use async_trait::async_trait;
-use std::collections::HashMap;
-use std::hash::Hash;
-use std::sync::Arc;
-
-#[derive(Debug, Clone)]
-pub enum Alternative {
- Tombstone,
- Value(Vec<u8>),
-}
-type ConcurrentValues = Vec<Alternative>;
-
-#[derive(Debug, Clone)]
-pub enum StorageError {
- NotFound,
- Internal,
-}
-impl std::fmt::Display for StorageError {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.write_str("Storage Error: ")?;
- match self {
- Self::NotFound => f.write_str("Item not found"),
- Self::Internal => f.write_str("An internal error occured"),
- }
- }
-}
-impl std::error::Error for StorageError {}
-
-#[derive(Debug, Clone, PartialEq)]
-pub struct RowUid {
- pub shard: String,
- pub sort: String,
-}
-
-#[derive(Debug, Clone, PartialEq)]
-pub struct RowRef {
- pub uid: RowUid,
- pub causality: Option<String>,
-}
-impl std::fmt::Display for RowRef {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "RowRef({}, {}, {:?})",
- self.uid.shard, self.uid.sort, self.causality
- )
- }
-}
-
-impl RowRef {
- pub fn new(shard: &str, sort: &str) -> Self {
- Self {
- uid: RowUid {
- shard: shard.to_string(),
- sort: sort.to_string(),
- },
- causality: None,
- }
- }
- pub fn with_causality(mut self, causality: String) -> Self {
- self.causality = Some(causality);
- self
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct RowVal {
- pub row_ref: RowRef,
- pub value: ConcurrentValues,
-}
-
-impl RowVal {
- pub fn new(row_ref: RowRef, value: Vec<u8>) -> Self {
- Self {
- row_ref,
- value: vec![Alternative::Value(value)],
- }
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct BlobRef(pub String);
-impl std::fmt::Display for BlobRef {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "BlobRef({})", self.0)
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct BlobVal {
- pub blob_ref: BlobRef,
- pub meta: HashMap<String, String>,
- pub value: Vec<u8>,
-}
-impl BlobVal {
- pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self {
- Self {
- blob_ref,
- value,
- meta: HashMap::new(),
- }
- }
-
- pub fn with_meta(mut self, k: String, v: String) -> Self {
- self.meta.insert(k, v);
- self
- }
-}
-
-#[derive(Debug)]
-pub enum Selector<'a> {
- Range {
- shard: &'a str,
- sort_begin: &'a str,
- sort_end: &'a str,
- },
- List(Vec<RowRef>), // list of (shard_key, sort_key)
- #[allow(dead_code)]
- Prefix {
- shard: &'a str,
- sort_prefix: &'a str,
- },
- Single(&'a RowRef),
-}
-impl<'a> std::fmt::Display for Selector<'a> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- Self::Range {
- shard,
- sort_begin,
- sort_end,
- } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end),
- Self::List(list) => write!(f, "List({:?})", list),
- Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix),
- Self::Single(row_ref) => write!(f, "Single({})", row_ref),
- }
- }
-}
-
-#[async_trait]
-pub trait IStore {
- async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>;
- async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>;
- async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>;
- async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>;
-
- async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
- async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError>;
- async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>;
- async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
- async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
-}
-
-#[derive(Clone, Debug, PartialEq, Eq, Hash)]
-pub struct UnicityBuffer(Vec<u8>);
-
-#[async_trait]
-pub trait IBuilder: std::fmt::Debug {
- async fn build(&self) -> Result<Store, StorageError>;
-
- /// Returns an opaque buffer that uniquely identifies this builder
- fn unique(&self) -> UnicityBuffer;
-}
-
-pub type Builder = Arc<dyn IBuilder + Send + Sync>;
-pub type Store = Box<dyn IStore + Send + Sync>;