aboutsummaryrefslogtreecommitdiff
path: root/aero-collections/src
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2024-05-16 17:38:34 +0200
committerQuentin Dufour <quentin@deuxfleurs.fr>2024-05-16 17:38:34 +0200
commit32dfd25f570b7a55bf43752684d286be0f6b2dc2 (patch)
treedd77871cda851bb5795743a3f04be61cf4c3ad61 /aero-collections/src
parent6b9542088cd1b66af46e95b787493b601accb495 (diff)
downloadaerogramme-32dfd25f570b7a55bf43752684d286be0f6b2dc2.tar.gz
aerogramme-32dfd25f570b7a55bf43752684d286be0f6b2dc2.zip
format + WIP calendar-query
Diffstat (limited to 'aero-collections/src')
-rw-r--r--aero-collections/src/calendar/mod.rs17
-rw-r--r--aero-collections/src/calendar/namespace.rs43
-rw-r--r--aero-collections/src/davdag.rs39
-rw-r--r--aero-collections/src/lib.rs6
-rw-r--r--aero-collections/src/mail/incoming.rs4
-rw-r--r--aero-collections/src/mail/mailbox.rs6
-rw-r--r--aero-collections/src/mail/mod.rs2
-rw-r--r--aero-collections/src/mail/namespace.rs6
-rw-r--r--aero-collections/src/mail/snapshot.rs2
-rw-r--r--aero-collections/src/mail/uidindex.rs2
-rw-r--r--aero-collections/src/user.rs7
11 files changed, 70 insertions, 64 deletions
diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs
index 028cf87..cd05328 100644
--- a/aero-collections/src/calendar/mod.rs
+++ b/aero-collections/src/calendar/mod.rs
@@ -4,12 +4,12 @@ use anyhow::{anyhow, bail, Result};
use tokio::sync::RwLock;
use aero_bayou::Bayou;
-use aero_user::login::Credentials;
use aero_user::cryptoblob::{self, gen_key, Key};
+use aero_user::login::Credentials;
use aero_user::storage::{self, BlobRef, BlobVal, Store};
+use crate::davdag::{BlobId, DavDag, IndexEntry, SyncChange, Token};
use crate::unique_ident::*;
-use crate::davdag::{DavDag, IndexEntry, Token, BlobId, SyncChange};
pub struct Calendar {
pub(super) id: UniqueIdent,
@@ -17,10 +17,7 @@ pub struct Calendar {
}
impl Calendar {
- pub(crate) async fn open(
- creds: &Credentials,
- id: UniqueIdent,
- ) -> Result<Self> {
+ pub(crate) async fn open(creds: &Credentials, id: UniqueIdent) -> Result<Self> {
let bayou_path = format!("calendar/dag/{}", id);
let cal_path = format!("calendar/events/{}", id);
@@ -126,7 +123,7 @@ impl CalendarInternal {
async fn put<'a>(&mut self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> {
let message_key = gen_key();
let blob_id = gen_ident();
-
+
let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?;
let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key);
@@ -138,9 +135,7 @@ impl CalendarInternal {
)
.with_meta(MESSAGE_KEY.to_string(), key_header);
- let etag = self.storage
- .blob_insert(blob_val)
- .await?;
+ let etag = self.storage.blob_insert(blob_val).await?;
// Add entry to Bayou
let entry: IndexEntry = (blob_id, name.to_string(), etag);
@@ -181,7 +176,7 @@ impl CalendarInternal {
let heads = davstate.heads_vec();
let token = match heads.as_slice() {
- [ token ] => *token,
+ [token] => *token,
_ => {
let op_mg = davstate.op_merge();
let token = op_mg.token();
diff --git a/aero-collections/src/calendar/namespace.rs b/aero-collections/src/calendar/namespace.rs
index 9c21d19..db65703 100644
--- a/aero-collections/src/calendar/namespace.rs
+++ b/aero-collections/src/calendar/namespace.rs
@@ -1,16 +1,16 @@
use anyhow::{bail, Result};
-use std::collections::{HashMap, BTreeMap};
-use std::sync::{Weak, Arc};
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc, Weak};
use serde::{Deserialize, Serialize};
use aero_bayou::timestamp::now_msec;
-use aero_user::storage;
use aero_user::cryptoblob::{open_deserialize, seal_serialize};
+use aero_user::storage;
+use super::Calendar;
use crate::unique_ident::{gen_ident, UniqueIdent};
use crate::user::User;
-use super::Calendar;
pub(crate) const CAL_LIST_PK: &str = "calendars";
pub(crate) const CAL_LIST_SK: &str = "list";
@@ -46,7 +46,7 @@ impl CalendarNs {
}
let cal = Arc::new(Calendar::open(&user.creds, id).await?);
-
+
let mut cache = self.0.lock().unwrap();
if let Some(concurrent_cal) = cache.get(&id).and_then(Weak::upgrade) {
drop(cal); // we worked for nothing but at least we didn't starve someone else
@@ -117,13 +117,15 @@ impl CalendarNs {
CalendarExists::Created(_) => (),
}
list.save(user, ct).await?;
-
+
Ok(())
}
/// Has calendar
pub async fn has(&self, user: &Arc<User>, name: &str) -> Result<bool> {
- CalendarList::load(user).await.map(|(list, _)| list.has(name))
+ CalendarList::load(user)
+ .await
+ .map(|(list, _)| list.has(name))
}
}
@@ -161,7 +163,8 @@ impl CalendarList {
for v in row_vals {
if let storage::Alternative::Value(vbytes) = v {
- let list2 = open_deserialize::<CalendarList>(&vbytes, &user.creds.keys.master)?;
+ let list2 =
+ open_deserialize::<CalendarList>(&vbytes, &user.creds.keys.master)?;
list.merge(list2);
}
}
@@ -200,7 +203,7 @@ impl CalendarList {
/// (Don't forget to save if it returns CalendarExists::Created)
fn create(&mut self, name: &str) -> CalendarExists {
if let Some(CalendarListEntry {
- id_lww: (_, Some(id))
+ id_lww: (_, Some(id)),
}) = self.0.get(name)
{
return CalendarExists::Existed(*id);
@@ -222,9 +225,10 @@ impl CalendarList {
/// For a given calendar name, get its Unique Identifier
fn get(&self, name: &str) -> Option<UniqueIdent> {
- self.0.get(name).map(|CalendarListEntry {
- id_lww: (_, ident),
- }| *ident).flatten()
+ self.0
+ .get(name)
+ .map(|CalendarListEntry { id_lww: (_, ident) }| *ident)
+ .flatten()
}
/// Check if a given calendar name exists
@@ -271,9 +275,7 @@ impl CalendarList {
(now_msec(), id)
}
}
- Some(CalendarListEntry {
- id_lww,
- }) => {
+ Some(CalendarListEntry { id_lww }) => {
if id_lww.1 == id {
// Entry is already equals to the requested id (Option<UniqueIdent)
// Nothing to do
@@ -281,20 +283,15 @@ impl CalendarList {
} else {
// Entry does not equal to what we know internally
// We update the Last Write Win CRDT here with the new id value
- (
- std::cmp::max(id_lww.0 + 1, now_msec()),
- id,
- )
+ (std::cmp::max(id_lww.0 + 1, now_msec()), id)
}
}
};
// If we did not return here, that's because we have to update
// something in our internal index.
- self.0.insert(
- name.into(),
- CalendarListEntry { id_lww: (ts, id) },
- );
+ self.0
+ .insert(name.into(), CalendarListEntry { id_lww: (ts, id) });
Some(())
}
diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs
index 7335bdc..36a9016 100644
--- a/aero-collections/src/davdag.rs
+++ b/aero-collections/src/davdag.rs
@@ -1,6 +1,6 @@
use anyhow::{bail, Result};
+use im::{ordset, OrdMap, OrdSet};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use im::{OrdMap, OrdSet, ordset};
use aero_bayou::*;
@@ -26,7 +26,6 @@ pub struct DavDag {
pub idx_by_filename: OrdMap<FileName, BlobId>,
// ------------ Below this line, data is ephemeral, ie. not checkpointed
-
/// Partial synchronization graph
pub ancestors: OrdMap<Token, OrdSet<Token>>,
@@ -84,7 +83,7 @@ impl DavDag {
// HELPER functions
pub fn heads_vec(&self) -> Vec<Token> {
- self.heads.clone().into_iter().collect()
+ self.heads.clone().into_iter().collect()
}
/// A sync descriptor
@@ -99,7 +98,7 @@ impl DavDag {
// We can't capture all missing events if we are not connected
// to all sinks of the graph,
// ie. if we don't already know all the sinks,
- // ie. if we are missing so much history that
+ // ie. if we are missing so much history that
// the event log has been transformed into a checkpoint
if !self.origins.is_subset(already_known.clone()) {
bail!("Not enough history to produce a correct diff, a full resync is needed");
@@ -124,7 +123,7 @@ impl DavDag {
if all_known.insert(cursor).is_some() {
// Item already processed
- continue
+ continue;
}
// Collect parents
@@ -167,7 +166,8 @@ impl DavDag {
self.idx_by_filename.remove(filename);
// Record the change in the ephemeral synchronization map
- self.change.insert(sync_token, SyncChange::NotFound(filename.to_string()));
+ self.change
+ .insert(sync_token, SyncChange::NotFound(filename.to_string()));
// Finally clear item from the source of trust
self.table.remove(blob_id);
@@ -179,10 +179,13 @@ impl DavDag {
// --- Update ANCESTORS
// We register ancestors as it is required for the sync algorithm
- self.ancestors.insert(*child, parents.iter().fold(ordset![], |mut acc, p| {
- acc.insert(*p);
- acc
- }));
+ self.ancestors.insert(
+ *child,
+ parents.iter().fold(ordset![], |mut acc, p| {
+ acc.insert(*p);
+ acc
+ }),
+ );
// --- Update ORIGINS
// If this event has no parents, it's an origin
@@ -192,11 +195,13 @@ impl DavDag {
// --- Update HEADS
// Remove from HEADS this event's parents
- parents.iter().for_each(|par| { self.heads.remove(par); });
+ parents.iter().for_each(|par| {
+ self.heads.remove(par);
+ });
// This event becomes a new HEAD in turn
self.heads.insert(*child);
-
+
// --- Update ALL NODES
self.all_nodes.insert(*child);
}
@@ -217,16 +222,16 @@ impl BayouState for DavDag {
fn apply(&self, op: &Self::Op) -> Self {
let mut new = self.clone();
-
+
match op {
DavDagOp::Put(sync_desc, entry) => {
new.sync_dag(sync_desc);
new.register(Some(sync_desc.1), entry.clone());
- },
+ }
DavDagOp::Delete(sync_desc, blob_id) => {
new.sync_dag(sync_desc);
new.unregister(sync_desc.1, blob_id);
- },
+ }
DavDagOp::Merge(sync_desc) => {
new.sync_dag(sync_desc);
}
@@ -252,7 +257,9 @@ impl<'de> Deserialize<'de> for DavDag {
let mut davdag = DavDag::default();
// Build the table + index
- val.items.into_iter().for_each(|entry| davdag.register(None, entry));
+ val.items
+ .into_iter()
+ .for_each(|entry| davdag.register(None, entry));
// Initialize the synchronization DAG with its roots
val.heads.into_iter().for_each(|ident| {
diff --git a/aero-collections/src/lib.rs b/aero-collections/src/lib.rs
index ef8b8d8..eabf61c 100644
--- a/aero-collections/src/lib.rs
+++ b/aero-collections/src/lib.rs
@@ -1,5 +1,5 @@
-pub mod unique_ident;
+pub mod calendar;
pub mod davdag;
-pub mod user;
pub mod mail;
-pub mod calendar;
+pub mod unique_ident;
+pub mod user;
diff --git a/aero-collections/src/mail/incoming.rs b/aero-collections/src/mail/incoming.rs
index cd2f8fd..55c2515 100644
--- a/aero-collections/src/mail/incoming.rs
+++ b/aero-collections/src/mail/incoming.rs
@@ -8,16 +8,16 @@ use futures::{future::BoxFuture, FutureExt};
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
+use aero_bayou::timestamp::now_msec;
use aero_user::cryptoblob;
use aero_user::login::{Credentials, PublicCredentials};
use aero_user::storage;
-use aero_bayou::timestamp::now_msec;
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
+use crate::mail::IMF;
use crate::unique_ident::*;
use crate::user::User;
-use crate::mail::IMF;
const INCOMING_PK: &str = "incoming";
const INCOMING_LOCK_SK: &str = "lock";
diff --git a/aero-collections/src/mail/mailbox.rs b/aero-collections/src/mail/mailbox.rs
index fcdb21e..bec9669 100644
--- a/aero-collections/src/mail/mailbox.rs
+++ b/aero-collections/src/mail/mailbox.rs
@@ -2,15 +2,15 @@ use anyhow::{anyhow, bail, Result};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
+use aero_bayou::timestamp::now_msec;
+use aero_bayou::Bayou;
use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
use aero_user::login::Credentials;
use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};
-use aero_bayou::Bayou;
-use aero_bayou::timestamp::now_msec;
-use crate::unique_ident::*;
use crate::mail::uidindex::*;
use crate::mail::IMF;
+use crate::unique_ident::*;
pub struct Mailbox {
pub(super) id: UniqueIdent,
diff --git a/aero-collections/src/mail/mod.rs b/aero-collections/src/mail/mod.rs
index ca9b08b..584a9eb 100644
--- a/aero-collections/src/mail/mod.rs
+++ b/aero-collections/src/mail/mod.rs
@@ -1,9 +1,9 @@
pub mod incoming;
pub mod mailbox;
+pub mod namespace;
pub mod query;
pub mod snapshot;
pub mod uidindex;
-pub mod namespace;
// Internet Message Format
// aka RFC 822 - RFC 2822 - RFC 5322
diff --git a/aero-collections/src/mail/namespace.rs b/aero-collections/src/mail/namespace.rs
index b1f6a70..0f1db7d 100644
--- a/aero-collections/src/mail/namespace.rs
+++ b/aero-collections/src/mail/namespace.rs
@@ -104,7 +104,11 @@ impl MailboxList {
/// Ensures mailbox `name` maps to id `id`.
/// If it already mapped to that, returns None.
/// If a change had to be done, returns Some(new uidvalidity in mailbox).
- pub(crate) fn set_mailbox(&mut self, name: &str, id: Option<UniqueIdent>) -> Option<ImapUidvalidity> {
+ pub(crate) fn set_mailbox(
+ &mut self,
+ name: &str,
+ id: Option<UniqueIdent>,
+ ) -> Option<ImapUidvalidity> {
let (ts, id, uidvalidity) = match self.0.get_mut(name) {
None => {
if id.is_none() {
diff --git a/aero-collections/src/mail/snapshot.rs b/aero-collections/src/mail/snapshot.rs
index 9503d4d..6f8a8a8 100644
--- a/aero-collections/src/mail/snapshot.rs
+++ b/aero-collections/src/mail/snapshot.rs
@@ -2,10 +2,10 @@ use std::sync::Arc;
use anyhow::Result;
-use crate::unique_ident::UniqueIdent;
use super::mailbox::Mailbox;
use super::query::{Query, QueryScope};
use super::uidindex::UidIndex;
+use crate::unique_ident::UniqueIdent;
/// A Frozen Mailbox has a snapshot of the current mailbox
/// state that is desynchronized with the real mailbox state.
diff --git a/aero-collections/src/mail/uidindex.rs b/aero-collections/src/mail/uidindex.rs
index ca975a3..6df3206 100644
--- a/aero-collections/src/mail/uidindex.rs
+++ b/aero-collections/src/mail/uidindex.rs
@@ -3,8 +3,8 @@ use std::num::{NonZeroU32, NonZeroU64};
use im::{HashMap, OrdMap, OrdSet};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use aero_bayou::*;
use crate::unique_ident::UniqueIdent;
+use aero_bayou::*;
pub type ModSeq = NonZeroU64;
pub type ImapUid = NonZeroU32;
diff --git a/aero-collections/src/user.rs b/aero-collections/src/user.rs
index 9ed342f..f125c46 100644
--- a/aero-collections/src/user.rs
+++ b/aero-collections/src/user.rs
@@ -9,12 +9,15 @@ use aero_user::cryptoblob::{open_deserialize, seal_serialize};
use aero_user::login::Credentials;
use aero_user::storage;
+use crate::calendar::namespace::CalendarNs;
use crate::mail::incoming::incoming_mail_watch_process;
use crate::mail::mailbox::Mailbox;
+use crate::mail::namespace::{
+ CreatedMailbox, MailboxList, ARCHIVE, DRAFTS, INBOX, MAILBOX_HIERARCHY_DELIMITER,
+ MAILBOX_LIST_PK, MAILBOX_LIST_SK, SENT, TRASH,
+};
use crate::mail::uidindex::ImapUidvalidity;
use crate::unique_ident::UniqueIdent;
-use crate::mail::namespace::{MAILBOX_HIERARCHY_DELIMITER, INBOX, DRAFTS, ARCHIVE, SENT, TRASH, MAILBOX_LIST_PK, MAILBOX_LIST_SK,MailboxList,CreatedMailbox};
-use crate::calendar::namespace::CalendarNs;
//@FIXME User should be totally rewriten
// to extract the local mailbox list