diff options
Diffstat (limited to 'aero-collections')
-rw-r--r-- | aero-collections/src/calendar/mod.rs | 17 | ||||
-rw-r--r-- | aero-collections/src/calendar/namespace.rs | 43 | ||||
-rw-r--r-- | aero-collections/src/davdag.rs | 39 | ||||
-rw-r--r-- | aero-collections/src/lib.rs | 6 | ||||
-rw-r--r-- | aero-collections/src/mail/incoming.rs | 4 | ||||
-rw-r--r-- | aero-collections/src/mail/mailbox.rs | 6 | ||||
-rw-r--r-- | aero-collections/src/mail/mod.rs | 2 | ||||
-rw-r--r-- | aero-collections/src/mail/namespace.rs | 6 | ||||
-rw-r--r-- | aero-collections/src/mail/snapshot.rs | 2 | ||||
-rw-r--r-- | aero-collections/src/mail/uidindex.rs | 2 | ||||
-rw-r--r-- | aero-collections/src/user.rs | 7 |
11 files changed, 70 insertions, 64 deletions
diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs index 028cf87..cd05328 100644 --- a/aero-collections/src/calendar/mod.rs +++ b/aero-collections/src/calendar/mod.rs @@ -4,12 +4,12 @@ use anyhow::{anyhow, bail, Result}; use tokio::sync::RwLock; use aero_bayou::Bayou; -use aero_user::login::Credentials; use aero_user::cryptoblob::{self, gen_key, Key}; +use aero_user::login::Credentials; use aero_user::storage::{self, BlobRef, BlobVal, Store}; +use crate::davdag::{BlobId, DavDag, IndexEntry, SyncChange, Token}; use crate::unique_ident::*; -use crate::davdag::{DavDag, IndexEntry, Token, BlobId, SyncChange}; pub struct Calendar { pub(super) id: UniqueIdent, @@ -17,10 +17,7 @@ pub struct Calendar { } impl Calendar { - pub(crate) async fn open( - creds: &Credentials, - id: UniqueIdent, - ) -> Result<Self> { + pub(crate) async fn open(creds: &Credentials, id: UniqueIdent) -> Result<Self> { let bayou_path = format!("calendar/dag/{}", id); let cal_path = format!("calendar/events/{}", id); @@ -126,7 +123,7 @@ impl CalendarInternal { async fn put<'a>(&mut self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> { let message_key = gen_key(); let blob_id = gen_ident(); - + let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?; let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key); @@ -138,9 +135,7 @@ impl CalendarInternal { ) .with_meta(MESSAGE_KEY.to_string(), key_header); - let etag = self.storage - .blob_insert(blob_val) - .await?; + let etag = self.storage.blob_insert(blob_val).await?; // Add entry to Bayou let entry: IndexEntry = (blob_id, name.to_string(), etag); @@ -181,7 +176,7 @@ impl CalendarInternal { let heads = davstate.heads_vec(); let token = match heads.as_slice() { - [ token ] => *token, + [token] => *token, _ => { let op_mg = davstate.op_merge(); let token = op_mg.token(); diff --git a/aero-collections/src/calendar/namespace.rs b/aero-collections/src/calendar/namespace.rs index 9c21d19..db65703 100644 --- a/aero-collections/src/calendar/namespace.rs +++ b/aero-collections/src/calendar/namespace.rs @@ -1,16 +1,16 @@ use anyhow::{bail, Result}; -use std::collections::{HashMap, BTreeMap}; -use std::sync::{Weak, Arc}; +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, Weak}; use serde::{Deserialize, Serialize}; use aero_bayou::timestamp::now_msec; -use aero_user::storage; use aero_user::cryptoblob::{open_deserialize, seal_serialize}; +use aero_user::storage; +use super::Calendar; use crate::unique_ident::{gen_ident, UniqueIdent}; use crate::user::User; -use super::Calendar; pub(crate) const CAL_LIST_PK: &str = "calendars"; pub(crate) const CAL_LIST_SK: &str = "list"; @@ -46,7 +46,7 @@ impl CalendarNs { } let cal = Arc::new(Calendar::open(&user.creds, id).await?); - + let mut cache = self.0.lock().unwrap(); if let Some(concurrent_cal) = cache.get(&id).and_then(Weak::upgrade) { drop(cal); // we worked for nothing but at least we didn't starve someone else @@ -117,13 +117,15 @@ impl CalendarNs { CalendarExists::Created(_) => (), } list.save(user, ct).await?; - + Ok(()) } /// Has calendar pub async fn has(&self, user: &Arc<User>, name: &str) -> Result<bool> { - CalendarList::load(user).await.map(|(list, _)| list.has(name)) + CalendarList::load(user) + .await + .map(|(list, _)| list.has(name)) } } @@ -161,7 +163,8 @@ impl CalendarList { for v in row_vals { if let storage::Alternative::Value(vbytes) = v { - let list2 = open_deserialize::<CalendarList>(&vbytes, &user.creds.keys.master)?; + let list2 = + open_deserialize::<CalendarList>(&vbytes, &user.creds.keys.master)?; list.merge(list2); } } @@ -200,7 +203,7 @@ impl CalendarList { /// (Don't forget to save if it returns CalendarExists::Created) fn create(&mut self, name: &str) -> CalendarExists { if let Some(CalendarListEntry { - id_lww: (_, Some(id)) + id_lww: (_, Some(id)), }) = self.0.get(name) { return CalendarExists::Existed(*id); @@ -222,9 +225,10 @@ impl CalendarList { /// For a given calendar name, get its Unique Identifier fn get(&self, name: &str) -> Option<UniqueIdent> { - self.0.get(name).map(|CalendarListEntry { - id_lww: (_, ident), - }| *ident).flatten() + self.0 + .get(name) + .map(|CalendarListEntry { id_lww: (_, ident) }| *ident) + .flatten() } /// Check if a given calendar name exists @@ -271,9 +275,7 @@ impl CalendarList { (now_msec(), id) } } - Some(CalendarListEntry { - id_lww, - }) => { + Some(CalendarListEntry { id_lww }) => { if id_lww.1 == id { // Entry is already equals to the requested id (Option<UniqueIdent) // Nothing to do @@ -281,20 +283,15 @@ impl CalendarList { } else { // Entry does not equal to what we know internally // We update the Last Write Win CRDT here with the new id value - ( - std::cmp::max(id_lww.0 + 1, now_msec()), - id, - ) + (std::cmp::max(id_lww.0 + 1, now_msec()), id) } } }; // If we did not return here, that's because we have to update // something in our internal index. - self.0.insert( - name.into(), - CalendarListEntry { id_lww: (ts, id) }, - ); + self.0 + .insert(name.into(), CalendarListEntry { id_lww: (ts, id) }); Some(()) } diff --git a/aero-collections/src/davdag.rs b/aero-collections/src/davdag.rs index 7335bdc..36a9016 100644 --- a/aero-collections/src/davdag.rs +++ b/aero-collections/src/davdag.rs @@ -1,6 +1,6 @@ use anyhow::{bail, Result}; +use im::{ordset, OrdMap, OrdSet}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use im::{OrdMap, OrdSet, ordset}; use aero_bayou::*; @@ -26,7 +26,6 @@ pub struct DavDag { pub idx_by_filename: OrdMap<FileName, BlobId>, // ------------ Below this line, data is ephemeral, ie. not checkpointed - /// Partial synchronization graph pub ancestors: OrdMap<Token, OrdSet<Token>>, @@ -84,7 +83,7 @@ impl DavDag { // HELPER functions pub fn heads_vec(&self) -> Vec<Token> { - self.heads.clone().into_iter().collect() + self.heads.clone().into_iter().collect() } /// A sync descriptor @@ -99,7 +98,7 @@ impl DavDag { // We can't capture all missing events if we are not connected // to all sinks of the graph, // ie. if we don't already know all the sinks, - // ie. if we are missing so much history that + // ie. if we are missing so much history that // the event log has been transformed into a checkpoint if !self.origins.is_subset(already_known.clone()) { bail!("Not enough history to produce a correct diff, a full resync is needed"); @@ -124,7 +123,7 @@ impl DavDag { if all_known.insert(cursor).is_some() { // Item already processed - continue + continue; } // Collect parents @@ -167,7 +166,8 @@ impl DavDag { self.idx_by_filename.remove(filename); // Record the change in the ephemeral synchronization map - self.change.insert(sync_token, SyncChange::NotFound(filename.to_string())); + self.change + .insert(sync_token, SyncChange::NotFound(filename.to_string())); // Finally clear item from the source of trust self.table.remove(blob_id); @@ -179,10 +179,13 @@ impl DavDag { // --- Update ANCESTORS // We register ancestors as it is required for the sync algorithm - self.ancestors.insert(*child, parents.iter().fold(ordset![], |mut acc, p| { - acc.insert(*p); - acc - })); + self.ancestors.insert( + *child, + parents.iter().fold(ordset![], |mut acc, p| { + acc.insert(*p); + acc + }), + ); // --- Update ORIGINS // If this event has no parents, it's an origin @@ -192,11 +195,13 @@ impl DavDag { // --- Update HEADS // Remove from HEADS this event's parents - parents.iter().for_each(|par| { self.heads.remove(par); }); + parents.iter().for_each(|par| { + self.heads.remove(par); + }); // This event becomes a new HEAD in turn self.heads.insert(*child); - + // --- Update ALL NODES self.all_nodes.insert(*child); } @@ -217,16 +222,16 @@ impl BayouState for DavDag { fn apply(&self, op: &Self::Op) -> Self { let mut new = self.clone(); - + match op { DavDagOp::Put(sync_desc, entry) => { new.sync_dag(sync_desc); new.register(Some(sync_desc.1), entry.clone()); - }, + } DavDagOp::Delete(sync_desc, blob_id) => { new.sync_dag(sync_desc); new.unregister(sync_desc.1, blob_id); - }, + } DavDagOp::Merge(sync_desc) => { new.sync_dag(sync_desc); } @@ -252,7 +257,9 @@ impl<'de> Deserialize<'de> for DavDag { let mut davdag = DavDag::default(); // Build the table + index - val.items.into_iter().for_each(|entry| davdag.register(None, entry)); + val.items + .into_iter() + .for_each(|entry| davdag.register(None, entry)); // Initialize the synchronization DAG with its roots val.heads.into_iter().for_each(|ident| { diff --git a/aero-collections/src/lib.rs b/aero-collections/src/lib.rs index ef8b8d8..eabf61c 100644 --- a/aero-collections/src/lib.rs +++ b/aero-collections/src/lib.rs @@ -1,5 +1,5 @@ -pub mod unique_ident; +pub mod calendar; pub mod davdag; -pub mod user; pub mod mail; -pub mod calendar; +pub mod unique_ident; +pub mod user; diff --git a/aero-collections/src/mail/incoming.rs b/aero-collections/src/mail/incoming.rs index cd2f8fd..55c2515 100644 --- a/aero-collections/src/mail/incoming.rs +++ b/aero-collections/src/mail/incoming.rs @@ -8,16 +8,16 @@ use futures::{future::BoxFuture, FutureExt}; use tokio::sync::watch; use tracing::{debug, error, info, warn}; +use aero_bayou::timestamp::now_msec; use aero_user::cryptoblob; use aero_user::login::{Credentials, PublicCredentials}; use aero_user::storage; -use aero_bayou::timestamp::now_msec; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; +use crate::mail::IMF; use crate::unique_ident::*; use crate::user::User; -use crate::mail::IMF; const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; diff --git a/aero-collections/src/mail/mailbox.rs b/aero-collections/src/mail/mailbox.rs index fcdb21e..bec9669 100644 --- a/aero-collections/src/mail/mailbox.rs +++ b/aero-collections/src/mail/mailbox.rs @@ -2,15 +2,15 @@ use anyhow::{anyhow, bail, Result}; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; +use aero_bayou::timestamp::now_msec; +use aero_bayou::Bayou; use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key}; use aero_user::login::Credentials; use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; -use aero_bayou::Bayou; -use aero_bayou::timestamp::now_msec; -use crate::unique_ident::*; use crate::mail::uidindex::*; use crate::mail::IMF; +use crate::unique_ident::*; pub struct Mailbox { pub(super) id: UniqueIdent, diff --git a/aero-collections/src/mail/mod.rs b/aero-collections/src/mail/mod.rs index ca9b08b..584a9eb 100644 --- a/aero-collections/src/mail/mod.rs +++ b/aero-collections/src/mail/mod.rs @@ -1,9 +1,9 @@ pub mod incoming; pub mod mailbox; +pub mod namespace; pub mod query; pub mod snapshot; pub mod uidindex; -pub mod namespace; // Internet Message Format // aka RFC 822 - RFC 2822 - RFC 5322 diff --git a/aero-collections/src/mail/namespace.rs b/aero-collections/src/mail/namespace.rs index b1f6a70..0f1db7d 100644 --- a/aero-collections/src/mail/namespace.rs +++ b/aero-collections/src/mail/namespace.rs @@ -104,7 +104,11 @@ impl MailboxList { /// Ensures mailbox `name` maps to id `id`. /// If it already mapped to that, returns None. /// If a change had to be done, returns Some(new uidvalidity in mailbox). - pub(crate) fn set_mailbox(&mut self, name: &str, id: Option<UniqueIdent>) -> Option<ImapUidvalidity> { + pub(crate) fn set_mailbox( + &mut self, + name: &str, + id: Option<UniqueIdent>, + ) -> Option<ImapUidvalidity> { let (ts, id, uidvalidity) = match self.0.get_mut(name) { None => { if id.is_none() { diff --git a/aero-collections/src/mail/snapshot.rs b/aero-collections/src/mail/snapshot.rs index 9503d4d..6f8a8a8 100644 --- a/aero-collections/src/mail/snapshot.rs +++ b/aero-collections/src/mail/snapshot.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use anyhow::Result; -use crate::unique_ident::UniqueIdent; use super::mailbox::Mailbox; use super::query::{Query, QueryScope}; use super::uidindex::UidIndex; +use crate::unique_ident::UniqueIdent; /// A Frozen Mailbox has a snapshot of the current mailbox /// state that is desynchronized with the real mailbox state. diff --git a/aero-collections/src/mail/uidindex.rs b/aero-collections/src/mail/uidindex.rs index ca975a3..6df3206 100644 --- a/aero-collections/src/mail/uidindex.rs +++ b/aero-collections/src/mail/uidindex.rs @@ -3,8 +3,8 @@ use std::num::{NonZeroU32, NonZeroU64}; use im::{HashMap, OrdMap, OrdSet}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use aero_bayou::*; use crate::unique_ident::UniqueIdent; +use aero_bayou::*; pub type ModSeq = NonZeroU64; pub type ImapUid = NonZeroU32; diff --git a/aero-collections/src/user.rs b/aero-collections/src/user.rs index 9ed342f..f125c46 100644 --- a/aero-collections/src/user.rs +++ b/aero-collections/src/user.rs @@ -9,12 +9,15 @@ use aero_user::cryptoblob::{open_deserialize, seal_serialize}; use aero_user::login::Credentials; use aero_user::storage; +use crate::calendar::namespace::CalendarNs; use crate::mail::incoming::incoming_mail_watch_process; use crate::mail::mailbox::Mailbox; +use crate::mail::namespace::{ + CreatedMailbox, MailboxList, ARCHIVE, DRAFTS, INBOX, MAILBOX_HIERARCHY_DELIMITER, + MAILBOX_LIST_PK, MAILBOX_LIST_SK, SENT, TRASH, +}; use crate::mail::uidindex::ImapUidvalidity; use crate::unique_ident::UniqueIdent; -use crate::mail::namespace::{MAILBOX_HIERARCHY_DELIMITER, INBOX, DRAFTS, ARCHIVE, SENT, TRASH, MAILBOX_LIST_PK, MAILBOX_LIST_SK,MailboxList,CreatedMailbox}; -use crate::calendar::namespace::CalendarNs; //@FIXME User should be totally rewriten // to extract the local mailbox list |