aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mail/incoming.rs2
-rw-r--r--src/mail/mailbox.rs7
-rw-r--r--src/storage/garage.rs8
-rw-r--r--src/storage/in_memory.rs242
-rw-r--r--src/storage/mod.rs9
5 files changed, 202 insertions, 66 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index f6b831d..2a6c947 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -386,7 +386,7 @@ async fn k2v_lock_loop_internal(
_ => None,
};
if let Some(ct) = release {
- match storage.row_rm(&storage::Selector::Single(&ct)).await {
+ match storage.row_rm_single(&ct).await {
Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
Ok(_) => (),
};
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index b4afd5e..65f44b1 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -361,7 +361,12 @@ impl MailboxInternal {
async {
// Delete mail meta from K2V
let sk = ident.to_string();
- self.storage.row_rm(&Selector::Single(&RowRef::new(&self.mail_path, &sk))).await?;
+ let res = self.storage
+ .row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk)))
+ .await?;
+ if let Some(row_val) = res.into_iter().next() {
+ self.storage.row_rm_single(&row_val.row_ref).await?;
+ }
Ok::<_, anyhow::Error>(())
}
)?;
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index ff37287..f202067 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -56,14 +56,18 @@ impl IStore for GarageStore {
unimplemented!();
}
+ async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
+ unimplemented!();
+ }
+
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
unimplemented!();
}
- async fn blob_insert(&self, blob_val: &BlobVal) -> Result<BlobVal, StorageError> {
+ async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> {
unimplemented!();
}
- async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
+ async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
unimplemented!();
}
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
index 6d0460f..c18bec3 100644
--- a/src/storage/in_memory.rs
+++ b/src/storage/in_memory.rs
@@ -1,19 +1,67 @@
use crate::storage::*;
use std::collections::{HashMap, BTreeMap};
-use std::ops::Bound::{Included, Unbounded, Excluded};
+use std::ops::Bound::{Included, Unbounded, Excluded, self};
use std::sync::{Arc, RwLock};
+use tokio::sync::Notify;
/// This implementation is very inneficient, and not completely correct
/// Indeed, when the connector is dropped, the memory is freed.
/// It means that when a user disconnects, its data are lost.
/// It's intended only for basic debugging, do not use it for advanced tests...
-pub type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, Vec<u8>>>>>;
-pub type ArcBlob = Arc<RwLock<HashMap<String, Vec<u8>>>>;
+#[derive(Debug, Clone)]
+enum InternalData {
+ Tombstone,
+ Value(Vec<u8>),
+}
+impl InternalData {
+ fn to_alternative(&self) -> Alternative {
+ match self {
+ Self::Tombstone => Alternative::Tombstone,
+ Self::Value(x) => Alternative::Value(x.clone()),
+ }
+ }
+}
+
+#[derive(Debug, Default)]
+struct InternalRowVal {
+ data: Vec<InternalData>,
+ version: u64,
+ change: Arc<Notify>,
+}
+impl InternalRowVal {
+ fn concurrent_values(&self) -> Vec<Alternative> {
+ self.data.iter().map(InternalData::to_alternative).collect()
+ }
+
+ fn to_row_val(&self, row_ref: RowRef) -> RowVal {
+ RowVal{
+ row_ref: row_ref.with_causality(self.version.to_string()),
+ value: self.concurrent_values(),
+ }
+ }
+}
+
+#[derive(Debug, Default, Clone)]
+struct InternalBlobVal {
+ data: Vec<u8>,
+ metadata: HashMap<String, String>,
+}
+impl InternalBlobVal {
+ fn to_blob_val(&self, bref: &BlobRef) -> BlobVal {
+ BlobVal {
+ blob_ref: bref.clone(),
+ meta: self.metadata.clone(),
+ value: self.data.clone(),
+ }
+ }
+}
+
+type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, InternalRowVal>>>>;
+type ArcBlob = Arc<RwLock<BTreeMap<String, InternalBlobVal>>>;
#[derive(Clone, Debug)]
pub struct MemBuilder {
- user: String,
unicity: Vec<u8>,
row: ArcRow,
blob: ArcBlob,
@@ -25,10 +73,9 @@ impl MemBuilder {
unicity.extend_from_slice(file!().as_bytes());
unicity.extend_from_slice(user.as_bytes());
Arc::new(Self {
- user: user.to_string(),
unicity,
row: Arc::new(RwLock::new(HashMap::new())),
- blob: Arc::new(RwLock::new(HashMap::new())),
+ blob: Arc::new(RwLock::new(BTreeMap::new())),
})
}
}
@@ -51,105 +98,180 @@ pub struct MemStore {
blob: ArcBlob,
}
-impl MemStore {
- fn inner_fetch(&self, row_ref: &RowRef) -> Result<Vec<u8>, StorageError> {
- Ok(self.row
- .read()
- .or(Err(StorageError::Internal))?
- .get(&row_ref.uid.shard)
- .ok_or(StorageError::NotFound)?
- .get(&row_ref.uid.sort)
- .ok_or(StorageError::NotFound)?
- .clone())
+fn prefix_last_bound(prefix: &str) -> Bound<String> {
+ let mut sort_end = prefix.to_string();
+ match sort_end.pop() {
+ None => Unbounded,
+ Some(ch) => {
+ let nc = char::from_u32(ch as u32 + 1).unwrap();
+ sort_end.push(nc);
+ Excluded(sort_end)
+ }
}
}
#[async_trait]
impl IStore for MemStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
+ let store = self.row.read().or(Err(StorageError::Internal))?;
+
match select {
Selector::Range { shard, sort_begin, sort_end } => {
- Ok(self.row
- .read()
- .or(Err(StorageError::Internal))?
+ Ok(store
.get(*shard)
.ok_or(StorageError::NotFound)?
.range((Included(sort_begin.to_string()), Excluded(sort_end.to_string())))
- .map(|(k, v)| RowVal {
- row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) },
- value: vec![Alternative::Value(v.clone())],
- })
+ .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
.collect::<Vec<_>>())
},
Selector::List(rlist) => {
let mut acc = vec![];
for row_ref in rlist {
- let bytes = self.inner_fetch(row_ref)?;
- let row_val = RowVal {
- row_ref: row_ref.clone(),
- value: vec![Alternative::Value(bytes)]
- };
- acc.push(row_val);
+ let intval = store
+ .get(&row_ref.uid.shard)
+ .ok_or(StorageError::NotFound)?
+ .get(&row_ref.uid.sort)
+ .ok_or(StorageError::NotFound)?;
+ acc.push(intval.to_row_val(row_ref.clone()));
}
Ok(acc)
},
Selector::Prefix { shard, sort_prefix } => {
- let mut sort_end = sort_prefix.to_string();
- let last_bound = match sort_end.pop() {
- None => Unbounded,
- Some(ch) => {
- let nc = char::from_u32(ch as u32 + 1).unwrap();
- sort_end.push(nc);
- Excluded(sort_end)
- }
- };
-
- Ok(self.row
- .read()
- .or(Err(StorageError::Internal))?
+ let last_bound = prefix_last_bound(sort_prefix);
+
+ Ok(store
.get(*shard)
.ok_or(StorageError::NotFound)?
.range((Included(sort_prefix.to_string()), last_bound))
- .map(|(k, v)| RowVal {
- row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) },
- value: vec![Alternative::Value(v.clone())],
- })
+ .map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
.collect::<Vec<_>>())
},
Selector::Single(row_ref) => {
- let bytes = self.inner_fetch(row_ref)?;
- Ok(vec![RowVal{ row_ref: (*row_ref).clone(), value: vec![Alternative::Value(bytes)]}])
+ let intval = store
+ .get(&row_ref.uid.shard)
+ .ok_or(StorageError::NotFound)?
+ .get(&row_ref.uid.sort)
+ .ok_or(StorageError::NotFound)?;
+ Ok(vec![intval.to_row_val((*row_ref).clone())])
}
}
}
+ async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
+ let mut store = self.row.write().or(Err(StorageError::Internal))?;
+ let shard = &entry.uid.shard;
+ let sort = &entry.uid.sort;
+
+ let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) {
+ Some(Ok(v)) => v,
+ _ => 0,
+ };
+
+ let bt = store.entry(shard.to_string()).or_default();
+ let intval = bt.entry(sort.to_string()).or_default();
+
+ if cauz == intval.version {
+ intval.data.clear();
+ }
+ intval.data.push(InternalData::Tombstone);
+ intval.version += 1;
+ intval.change.notify_waiters();
+
+ Ok(())
+ }
+
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
- unimplemented!();
+ //@FIXME not efficient at all...
+ let values = self.row_fetch(select).await?;
+
+ for v in values.into_iter() {
+ self.row_rm_single(&v.row_ref).await?;
+ }
+ Ok(())
}
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
- unimplemented!();
+ let mut store = self.row.write().or(Err(StorageError::Internal))?;
+ for v in values.into_iter() {
+ let shard = v.row_ref.uid.shard;
+ let sort = v.row_ref.uid.sort;
+
+ let val = match v.value.into_iter().next() {
+ Some(Alternative::Value(x)) => x,
+ _ => vec![],
+ };
+
+ let cauz = match v.row_ref.causality.map(|v| v.parse::<u64>()) {
+ Some(Ok(v)) => v,
+ _ => 0,
+ };
+ let bt = store.entry(shard).or_default();
+ let intval = bt.entry(sort).or_default();
+
+ if cauz == intval.version {
+ intval.data.clear();
+ }
+ intval.data.push(InternalData::Value(val));
+ intval.version += 1;
+ intval.change.notify_waiters();
+ }
+ Ok(())
}
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
- unimplemented!();
+ let shard = &value.uid.shard;
+ let sort = &value.uid.sort;
+ let cauz = match value.causality.as_ref().map(|v| v.parse::<u64>()) {
+ Some(Ok(v)) => v,
+ _ => 0,
+ };
+
+ let notify_me = {
+ let store = self.row.read().or(Err(StorageError::Internal))?;
+ let intval = store
+ .get(shard)
+ .ok_or(StorageError::NotFound)?
+ .get(sort)
+ .ok_or(StorageError::NotFound)?;
+
+ if intval.version != cauz {
+ return Ok(intval.to_row_val(value.clone()));
+ }
+ intval.change.clone()
+ };
+
+ notify_me.notified().await;
+
+ let res = self.row_fetch(&Selector::Single(value)).await?;
+ res.into_iter().next().ok_or(StorageError::NotFound)
}
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
- unimplemented!();
-
+ let store = self.blob.read().or(Err(StorageError::Internal))?;
+ store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref))
}
- async fn blob_insert(&self, blob_val: &BlobVal) -> Result<BlobVal, StorageError> {
- unimplemented!();
+ async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> {
+ let mut store = self.blob.write().or(Err(StorageError::Internal))?;
+ let entry = store.entry(blob_val.blob_ref.0.clone()).or_default();
+ entry.data = blob_val.value.clone();
+ entry.metadata = blob_val.meta.clone();
+ Ok(())
}
- async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError> {
- unimplemented!();
-
+ async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
+ let mut store = self.blob.write().or(Err(StorageError::Internal))?;
+ let blob_src = store.entry(src.0.clone()).or_default().clone();
+ store.insert(dst.0.clone(), blob_src);
+ Ok(())
}
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
- unimplemented!();
+ let store = self.blob.read().or(Err(StorageError::Internal))?;
+ let last_bound = prefix_last_bound(prefix);
+ let blist = store.range((Included(prefix.to_string()), last_bound)).map(|(k, _)| BlobRef(k.to_string())).collect::<Vec<_>>();
+ Ok(blist)
}
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
- unimplemented!();
+ let mut store = self.blob.write().or(Err(StorageError::Internal))?;
+ store.remove(&blob_ref.0);
+ Ok(())
}
}
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index 8004ac5..a21e07d 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -61,6 +61,10 @@ impl RowRef {
causality: None,
}
}
+ pub fn with_causality(mut self, causality: String) -> Self {
+ self.causality = Some(causality);
+ self
+ }
}
#[derive(Debug, Clone)]
@@ -118,12 +122,13 @@ pub enum Selector<'a> {
pub trait IStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>;
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>;
+ async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError>;
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>;
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>;
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>;
- async fn blob_insert(&self, blob_val: &BlobVal) -> Result<BlobVal, StorageError>;
- async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<BlobVal, StorageError>;
+ async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError>;
+ async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>;
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>;
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>;
}