aboutsummaryrefslogtreecommitdiff
path: root/aero-collections/src/calendar
diff options
context:
space:
mode:
Diffstat (limited to 'aero-collections/src/calendar')
-rw-r--r--aero-collections/src/calendar/mod.rs204
-rw-r--r--aero-collections/src/calendar/namespace.rs324
2 files changed, 528 insertions, 0 deletions
diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs
new file mode 100644
index 0000000..ac07842
--- /dev/null
+++ b/aero-collections/src/calendar/mod.rs
@@ -0,0 +1,204 @@
+pub mod namespace;
+
+use anyhow::{anyhow, bail, Result};
+use tokio::sync::RwLock;
+
+use aero_bayou::Bayou;
+use aero_user::cryptoblob::{self, gen_key, Key};
+use aero_user::login::Credentials;
+use aero_user::storage::{self, BlobRef, BlobVal, Store};
+
+use crate::davdag::{BlobId, DavDag, IndexEntry, SyncChange, Token};
+use crate::unique_ident::*;
+
+pub struct Calendar {
+ pub(super) id: UniqueIdent,
+ internal: RwLock<CalendarInternal>,
+}
+
+impl Calendar {
+ pub(crate) async fn open(creds: &Credentials, id: UniqueIdent) -> Result<Self> {
+ let bayou_path = format!("calendar/dag/{}", id);
+ let cal_path = format!("calendar/events/{}", id);
+
+ let mut davdag = Bayou::<DavDag>::new(creds, bayou_path).await?;
+ davdag.sync().await?;
+
+ let internal = RwLock::new(CalendarInternal {
+ id,
+ encryption_key: creds.keys.master.clone(),
+ storage: creds.storage.build().await?,
+ davdag,
+ cal_path,
+ });
+
+ Ok(Self { id, internal })
+ }
+
+ // ---- DAG sync utilities
+
+ /// Sync data with backing store
+ pub async fn force_sync(&self) -> Result<()> {
+ self.internal.write().await.force_sync().await
+ }
+
+ /// Sync data with backing store only if changes are detected
+ /// or last sync is too old
+ pub async fn opportunistic_sync(&self) -> Result<()> {
+ self.internal.write().await.opportunistic_sync().await
+ }
+
+ // ---- Data API
+
+ /// Access the DAG internal data (you can get the list of files for example)
+ pub async fn dag(&self) -> DavDag {
+ // Cloning is cheap
+ self.internal.read().await.davdag.state().clone()
+ }
+
+ /// Access the current token
+ pub async fn token(&self) -> Result<Token> {
+ self.internal.write().await.current_token().await
+ }
+
+ /// The diff API is a write API as we might need to push a merge node
+ /// to get a new sync token
+ pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
+ self.internal.write().await.diff(sync_token).await
+ }
+
+ /// Get a specific event
+ pub async fn get(&self, evt_id: UniqueIdent) -> Result<Vec<u8>> {
+ self.internal.read().await.get(evt_id).await
+ }
+
+ /// Put a specific event
+ pub async fn put<'a>(&self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> {
+ self.internal.write().await.put(name, evt).await
+ }
+
+ /// Delete a specific event
+ pub async fn delete(&self, blob_id: UniqueIdent) -> Result<Token> {
+ self.internal.write().await.delete(blob_id).await
+ }
+}
+
+use base64::Engine;
+const MESSAGE_KEY: &str = "message-key";
+struct CalendarInternal {
+ #[allow(dead_code)]
+ id: UniqueIdent,
+ cal_path: String,
+ encryption_key: Key,
+ storage: Store,
+ davdag: Bayou<DavDag>,
+}
+
+impl CalendarInternal {
+ async fn force_sync(&mut self) -> Result<()> {
+ self.davdag.sync().await?;
+ Ok(())
+ }
+
+ async fn opportunistic_sync(&mut self) -> Result<()> {
+ self.davdag.opportunistic_sync().await?;
+ Ok(())
+ }
+
+ async fn get(&self, blob_id: BlobId) -> Result<Vec<u8>> {
+ // Fetch message from S3
+ let blob_ref = storage::BlobRef(format!("{}/{}", self.cal_path, blob_id));
+ let object = self.storage.blob_fetch(&blob_ref).await?;
+
+ // Decrypt message key from headers
+ let key_encrypted_b64 = object
+ .meta
+ .get(MESSAGE_KEY)
+ .ok_or(anyhow!("Missing key in metadata"))?;
+ let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
+ let message_key_raw = cryptoblob::open(&key_encrypted, &self.encryption_key)?;
+ let message_key =
+ cryptoblob::Key::from_slice(&message_key_raw).ok_or(anyhow!("Invalid message key"))?;
+
+ // Decrypt body
+ let body = object.value;
+ cryptoblob::open(&body, &message_key)
+ }
+
+ async fn put<'a>(&mut self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> {
+ let message_key = gen_key();
+ let blob_id = gen_ident();
+
+ let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?;
+ let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key);
+
+ // Write event to S3
+ let message_blob = cryptoblob::seal(evt, &message_key)?;
+ let blob_val = BlobVal::new(
+ BlobRef(format!("{}/{}", self.cal_path, blob_id)),
+ message_blob,
+ )
+ .with_meta(MESSAGE_KEY.to_string(), key_header);
+
+ let etag = self.storage.blob_insert(blob_val).await?;
+
+ // Add entry to Bayou
+ let entry: IndexEntry = (blob_id, name.to_string(), etag);
+ let davstate = self.davdag.state();
+ let put_op = davstate.op_put(entry.clone());
+ let token = put_op.token();
+ self.davdag.push(put_op).await?;
+
+ Ok((token, entry))
+ }
+
+ async fn delete(&mut self, blob_id: BlobId) -> Result<Token> {
+ let davstate = self.davdag.state();
+
+ if !davstate.table.contains_key(&blob_id) {
+ bail!("Cannot delete event that doesn't exist");
+ }
+
+ let del_op = davstate.op_delete(blob_id);
+ let token = del_op.token();
+ self.davdag.push(del_op).await?;
+
+ let blob_ref = BlobRef(format!("{}/{}", self.cal_path, blob_id));
+ self.storage.blob_rm(&blob_ref).await?;
+
+ Ok(token)
+ }
+
+ async fn diff(&mut self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
+ let davstate = self.davdag.state();
+
+ let token_changed = davstate.resolve(sync_token)?;
+ let changes = token_changed
+ .iter()
+ .filter_map(|t: &Token| davstate.change.get(t))
+ .map(|s| s.clone())
+ .filter(|s| match s {
+ SyncChange::Ok((filename, _)) => davstate.idx_by_filename.get(filename).is_some(),
+ SyncChange::NotFound(filename) => davstate.idx_by_filename.get(filename).is_none(),
+ })
+ .collect();
+
+ let token = self.current_token().await?;
+ Ok((token, changes))
+ }
+
+ async fn current_token(&mut self) -> Result<Token> {
+ let davstate = self.davdag.state();
+ let heads = davstate.heads_vec();
+ let token = match heads.as_slice() {
+ [token] => *token,
+ _ => {
+ let op_mg = davstate.op_merge();
+ let token = op_mg.token();
+ self.davdag.push(op_mg).await?;
+ token
+ }
+ };
+ Ok(token)
+ }
+}
diff --git a/aero-collections/src/calendar/namespace.rs b/aero-collections/src/calendar/namespace.rs
new file mode 100644
index 0000000..db65703
--- /dev/null
+++ b/aero-collections/src/calendar/namespace.rs
@@ -0,0 +1,324 @@
+use anyhow::{bail, Result};
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc, Weak};
+
+use serde::{Deserialize, Serialize};
+
+use aero_bayou::timestamp::now_msec;
+use aero_user::cryptoblob::{open_deserialize, seal_serialize};
+use aero_user::storage;
+
+use super::Calendar;
+use crate::unique_ident::{gen_ident, UniqueIdent};
+use crate::user::User;
+
+pub(crate) const CAL_LIST_PK: &str = "calendars";
+pub(crate) const CAL_LIST_SK: &str = "list";
+pub(crate) const MAIN_CAL: &str = "Personal";
+pub(crate) const MAX_CALNAME_CHARS: usize = 32;
+
+pub struct CalendarNs(std::sync::Mutex<HashMap<UniqueIdent, Weak<Calendar>>>);
+
+impl CalendarNs {
+ /// Create a new calendar namespace
+ pub fn new() -> Self {
+ Self(std::sync::Mutex::new(HashMap::new()))
+ }
+
+ /// Open a calendar by name
+ pub async fn open(&self, user: &Arc<User>, name: &str) -> Result<Option<Arc<Calendar>>> {
+ let (list, _ct) = CalendarList::load(user).await?;
+
+ match list.get(name) {
+ None => Ok(None),
+ Some(ident) => Ok(Some(self.open_by_id(user, ident).await?)),
+ }
+ }
+
+ /// Open a calendar by unique id
+ /// Check user.rs::open_mailbox_by_id to understand this function
+ pub async fn open_by_id(&self, user: &Arc<User>, id: UniqueIdent) -> Result<Arc<Calendar>> {
+ {
+ let cache = self.0.lock().unwrap();
+ if let Some(cal) = cache.get(&id).and_then(Weak::upgrade) {
+ return Ok(cal);
+ }
+ }
+
+ let cal = Arc::new(Calendar::open(&user.creds, id).await?);
+
+ let mut cache = self.0.lock().unwrap();
+ if let Some(concurrent_cal) = cache.get(&id).and_then(Weak::upgrade) {
+ drop(cal); // we worked for nothing but at least we didn't starve someone else
+ Ok(concurrent_cal)
+ } else {
+ cache.insert(id, Arc::downgrade(&cal));
+ Ok(cal)
+ }
+ }
+
+ /// List calendars
+ pub async fn list(&self, user: &Arc<User>) -> Result<Vec<String>> {
+ CalendarList::load(user).await.map(|(list, _)| list.names())
+ }
+
+ /// Delete a calendar from the index
+ pub async fn delete(&self, user: &Arc<User>, name: &str) -> Result<()> {
+ // We currently assume that main cal is a bit specific
+ if name == MAIN_CAL {
+ bail!("Cannot delete main calendar");
+ }
+
+ let (mut list, ct) = CalendarList::load(user).await?;
+ if list.has(name) {
+ //@TODO: actually delete calendar content
+ list.bind(name, None);
+ list.save(user, ct).await?;
+ Ok(())
+ } else {
+ bail!("Calendar {} does not exist", name);
+ }
+ }
+
+ /// Rename a calendar in the index
+ pub async fn rename(&self, user: &Arc<User>, old: &str, new: &str) -> Result<()> {
+ if old == MAIN_CAL {
+ bail!("Renaming main calendar is not supported currently");
+ }
+ if !new.chars().all(char::is_alphanumeric) {
+ bail!("Unsupported characters in new calendar name, only alphanumeric characters are allowed currently");
+ }
+ if new.len() > MAX_CALNAME_CHARS {
+ bail!("Calendar name can't contain more than 32 characters");
+ }
+
+ let (mut list, ct) = CalendarList::load(user).await?;
+ list.rename(old, new)?;
+ list.save(user, ct).await?;
+
+ Ok(())
+ }
+
+ /// Create calendar
+ pub async fn create(&self, user: &Arc<User>, name: &str) -> Result<()> {
+ if name == MAIN_CAL {
+ bail!("Main calendar is automatically created, can't create it manually");
+ }
+ if !name.chars().all(char::is_alphanumeric) {
+ bail!("Unsupported characters in new calendar name, only alphanumeric characters are allowed");
+ }
+ if name.len() > MAX_CALNAME_CHARS {
+ bail!("Calendar name can't contain more than 32 characters");
+ }
+
+ let (mut list, ct) = CalendarList::load(user).await?;
+ match list.create(name) {
+ CalendarExists::Existed(_) => bail!("Calendar {} already exists", name),
+ CalendarExists::Created(_) => (),
+ }
+ list.save(user, ct).await?;
+
+ Ok(())
+ }
+
+ /// Has calendar
+ pub async fn has(&self, user: &Arc<User>, name: &str) -> Result<bool> {
+ CalendarList::load(user)
+ .await
+ .map(|(list, _)| list.has(name))
+ }
+}
+
+// ------
+// ------ From this point, implementation is hidden from the rest of the crate
+// ------
+
+#[derive(Serialize, Deserialize)]
+struct CalendarList(BTreeMap<String, CalendarListEntry>);
+
+#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
+struct CalendarListEntry {
+ id_lww: (u64, Option<UniqueIdent>),
+}
+
+impl CalendarList {
+ // ---- Index persistence related functions
+
+ /// Load from storage
+ async fn load(user: &Arc<User>) -> Result<(Self, Option<storage::RowRef>)> {
+ let row_ref = storage::RowRef::new(CAL_LIST_PK, CAL_LIST_SK);
+ let (mut list, row) = match user
+ .storage
+ .row_fetch(&storage::Selector::Single(&row_ref))
+ .await
+ {
+ Err(storage::StorageError::NotFound) => (Self::new(), None),
+ Err(e) => return Err(e.into()),
+ Ok(rv) => {
+ let mut list = Self::new();
+ let (row_ref, row_vals) = match rv.into_iter().next() {
+ Some(row_val) => (row_val.row_ref, row_val.value),
+ None => (row_ref, vec![]),
+ };
+
+ for v in row_vals {
+ if let storage::Alternative::Value(vbytes) = v {
+ let list2 =
+ open_deserialize::<CalendarList>(&vbytes, &user.creds.keys.master)?;
+ list.merge(list2);
+ }
+ }
+ (list, Some(row_ref))
+ }
+ };
+
+ // Create default calendars (currently only one calendar is created)
+ let is_default_cal_missing = [MAIN_CAL]
+ .iter()
+ .map(|calname| list.create(calname))
+ .fold(false, |acc, r| {
+ acc || matches!(r, CalendarExists::Created(..))
+ });
+
+ // Save the index if we created a new calendar
+ if is_default_cal_missing {
+ list.save(user, row.clone()).await?;
+ }
+
+ Ok((list, row))
+ }
+
+ /// Save an updated index
+ async fn save(&self, user: &Arc<User>, ct: Option<storage::RowRef>) -> Result<()> {
+ let list_blob = seal_serialize(self, &user.creds.keys.master)?;
+ let rref = ct.unwrap_or(storage::RowRef::new(CAL_LIST_PK, CAL_LIST_SK));
+ let row_val = storage::RowVal::new(rref, list_blob);
+ user.storage.row_insert(vec![row_val]).await?;
+ Ok(())
+ }
+
+ // ----- Index manipulation functions
+
+ /// Ensure that a given calendar exists
+ /// (Don't forget to save if it returns CalendarExists::Created)
+ fn create(&mut self, name: &str) -> CalendarExists {
+ if let Some(CalendarListEntry {
+ id_lww: (_, Some(id)),
+ }) = self.0.get(name)
+ {
+ return CalendarExists::Existed(*id);
+ }
+
+ let id = gen_ident();
+ self.bind(name, Some(id)).unwrap();
+ CalendarExists::Created(id)
+ }
+
+ /// Get a list of all calendar names
+ fn names(&self) -> Vec<String> {
+ self.0
+ .iter()
+ .filter(|(_, v)| v.id_lww.1.is_some())
+ .map(|(k, _)| k.to_string())
+ .collect()
+ }
+
+ /// For a given calendar name, get its Unique Identifier
+ fn get(&self, name: &str) -> Option<UniqueIdent> {
+ self.0
+ .get(name)
+ .map(|CalendarListEntry { id_lww: (_, ident) }| *ident)
+ .flatten()
+ }
+
+ /// Check if a given calendar name exists
+ fn has(&self, name: &str) -> bool {
+ self.get(name).is_some()
+ }
+
+ /// Rename a calendar
+ fn rename(&mut self, old: &str, new: &str) -> Result<()> {
+ if self.has(new) {
+ bail!("Calendar {} already exists", new);
+ }
+ let ident = match self.get(old) {
+ None => bail!("Calendar {} does not exist", old),
+ Some(ident) => ident,
+ };
+
+ self.bind(old, None);
+ self.bind(new, Some(ident));
+
+ Ok(())
+ }
+
+ // ----- Internal logic
+
+ /// New is not publicly exposed, use `load` instead
+ fn new() -> Self {
+ Self(BTreeMap::new())
+ }
+
+ /// Low level index updating logic (used to add/rename/delete) an entry
+ fn bind(&mut self, name: &str, id: Option<UniqueIdent>) -> Option<()> {
+ let (ts, id) = match self.0.get_mut(name) {
+ None => {
+ if id.is_none() {
+ // User wants to delete entry with given name (passed id is None)
+ // Entry does not exist (get_mut is None)
+ // Nothing to do
+ return None;
+ } else {
+ // User wants entry with given name to be present (id is Some)
+ // Entry does not exist
+ // Initialize entry
+ (now_msec(), id)
+ }
+ }
+ Some(CalendarListEntry { id_lww }) => {
+ if id_lww.1 == id {
+ // Entry is already equals to the requested id (Option<UniqueIdent)
+ // Nothing to do
+ return None;
+ } else {
+ // Entry does not equal to what we know internally
+ // We update the Last Write Win CRDT here with the new id value
+ (std::cmp::max(id_lww.0 + 1, now_msec()), id)
+ }
+ }
+ };
+
+ // If we did not return here, that's because we have to update
+ // something in our internal index.
+ self.0
+ .insert(name.into(), CalendarListEntry { id_lww: (ts, id) });
+ Some(())
+ }
+
+ // Merge 2 calendar lists by applying a LWW logic on each element
+ fn merge(&mut self, list2: Self) {
+ for (k, v) in list2.0.into_iter() {
+ if let Some(e) = self.0.get_mut(&k) {
+ e.merge(&v);
+ } else {
+ self.0.insert(k, v);
+ }
+ }
+ }
+}
+
+impl CalendarListEntry {
+ fn merge(&mut self, other: &Self) {
+ // Simple CRDT merge rule
+ if other.id_lww.0 > self.id_lww.0
+ || (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1)
+ {
+ self.id_lww = other.id_lww;
+ }
+ }
+}
+
+pub(crate) enum CalendarExists {
+ Created(UniqueIdent),
+ Existed(UniqueIdent),
+}