aboutsummaryrefslogtreecommitdiff
path: root/aero-collections/src/calendar/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'aero-collections/src/calendar/mod.rs')
-rw-r--r--aero-collections/src/calendar/mod.rs204
1 files changed, 204 insertions, 0 deletions
diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs
new file mode 100644
index 0000000..ac07842
--- /dev/null
+++ b/aero-collections/src/calendar/mod.rs
@@ -0,0 +1,204 @@
+pub mod namespace;
+
+use anyhow::{anyhow, bail, Result};
+use tokio::sync::RwLock;
+
+use aero_bayou::Bayou;
+use aero_user::cryptoblob::{self, gen_key, Key};
+use aero_user::login::Credentials;
+use aero_user::storage::{self, BlobRef, BlobVal, Store};
+
+use crate::davdag::{BlobId, DavDag, IndexEntry, SyncChange, Token};
+use crate::unique_ident::*;
+
+pub struct Calendar {
+ pub(super) id: UniqueIdent,
+ internal: RwLock<CalendarInternal>,
+}
+
+impl Calendar {
+ pub(crate) async fn open(creds: &Credentials, id: UniqueIdent) -> Result<Self> {
+ let bayou_path = format!("calendar/dag/{}", id);
+ let cal_path = format!("calendar/events/{}", id);
+
+ let mut davdag = Bayou::<DavDag>::new(creds, bayou_path).await?;
+ davdag.sync().await?;
+
+ let internal = RwLock::new(CalendarInternal {
+ id,
+ encryption_key: creds.keys.master.clone(),
+ storage: creds.storage.build().await?,
+ davdag,
+ cal_path,
+ });
+
+ Ok(Self { id, internal })
+ }
+
+ // ---- DAG sync utilities
+
+ /// Sync data with backing store
+ pub async fn force_sync(&self) -> Result<()> {
+ self.internal.write().await.force_sync().await
+ }
+
+ /// Sync data with backing store only if changes are detected
+ /// or last sync is too old
+ pub async fn opportunistic_sync(&self) -> Result<()> {
+ self.internal.write().await.opportunistic_sync().await
+ }
+
+ // ---- Data API
+
+ /// Access the DAG internal data (you can get the list of files for example)
+ pub async fn dag(&self) -> DavDag {
+ // Cloning is cheap
+ self.internal.read().await.davdag.state().clone()
+ }
+
+ /// Access the current token
+ pub async fn token(&self) -> Result<Token> {
+ self.internal.write().await.current_token().await
+ }
+
+ /// The diff API is a write API as we might need to push a merge node
+ /// to get a new sync token
+ pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
+ self.internal.write().await.diff(sync_token).await
+ }
+
+ /// Get a specific event
+ pub async fn get(&self, evt_id: UniqueIdent) -> Result<Vec<u8>> {
+ self.internal.read().await.get(evt_id).await
+ }
+
+ /// Put a specific event
+ pub async fn put<'a>(&self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> {
+ self.internal.write().await.put(name, evt).await
+ }
+
+ /// Delete a specific event
+ pub async fn delete(&self, blob_id: UniqueIdent) -> Result<Token> {
+ self.internal.write().await.delete(blob_id).await
+ }
+}
+
+use base64::Engine;
+const MESSAGE_KEY: &str = "message-key";
+struct CalendarInternal {
+ #[allow(dead_code)]
+ id: UniqueIdent,
+ cal_path: String,
+ encryption_key: Key,
+ storage: Store,
+ davdag: Bayou<DavDag>,
+}
+
+impl CalendarInternal {
+ async fn force_sync(&mut self) -> Result<()> {
+ self.davdag.sync().await?;
+ Ok(())
+ }
+
+ async fn opportunistic_sync(&mut self) -> Result<()> {
+ self.davdag.opportunistic_sync().await?;
+ Ok(())
+ }
+
+ async fn get(&self, blob_id: BlobId) -> Result<Vec<u8>> {
+ // Fetch message from S3
+ let blob_ref = storage::BlobRef(format!("{}/{}", self.cal_path, blob_id));
+ let object = self.storage.blob_fetch(&blob_ref).await?;
+
+ // Decrypt message key from headers
+ let key_encrypted_b64 = object
+ .meta
+ .get(MESSAGE_KEY)
+ .ok_or(anyhow!("Missing key in metadata"))?;
+ let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
+ let message_key_raw = cryptoblob::open(&key_encrypted, &self.encryption_key)?;
+ let message_key =
+ cryptoblob::Key::from_slice(&message_key_raw).ok_or(anyhow!("Invalid message key"))?;
+
+ // Decrypt body
+ let body = object.value;
+ cryptoblob::open(&body, &message_key)
+ }
+
+ async fn put<'a>(&mut self, name: &str, evt: &'a [u8]) -> Result<(Token, IndexEntry)> {
+ let message_key = gen_key();
+ let blob_id = gen_ident();
+
+ let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?;
+ let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key);
+
+ // Write event to S3
+ let message_blob = cryptoblob::seal(evt, &message_key)?;
+ let blob_val = BlobVal::new(
+ BlobRef(format!("{}/{}", self.cal_path, blob_id)),
+ message_blob,
+ )
+ .with_meta(MESSAGE_KEY.to_string(), key_header);
+
+ let etag = self.storage.blob_insert(blob_val).await?;
+
+ // Add entry to Bayou
+ let entry: IndexEntry = (blob_id, name.to_string(), etag);
+ let davstate = self.davdag.state();
+ let put_op = davstate.op_put(entry.clone());
+ let token = put_op.token();
+ self.davdag.push(put_op).await?;
+
+ Ok((token, entry))
+ }
+
+ async fn delete(&mut self, blob_id: BlobId) -> Result<Token> {
+ let davstate = self.davdag.state();
+
+ if !davstate.table.contains_key(&blob_id) {
+ bail!("Cannot delete event that doesn't exist");
+ }
+
+ let del_op = davstate.op_delete(blob_id);
+ let token = del_op.token();
+ self.davdag.push(del_op).await?;
+
+ let blob_ref = BlobRef(format!("{}/{}", self.cal_path, blob_id));
+ self.storage.blob_rm(&blob_ref).await?;
+
+ Ok(token)
+ }
+
+ async fn diff(&mut self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
+ let davstate = self.davdag.state();
+
+ let token_changed = davstate.resolve(sync_token)?;
+ let changes = token_changed
+ .iter()
+ .filter_map(|t: &Token| davstate.change.get(t))
+ .map(|s| s.clone())
+ .filter(|s| match s {
+ SyncChange::Ok((filename, _)) => davstate.idx_by_filename.get(filename).is_some(),
+ SyncChange::NotFound(filename) => davstate.idx_by_filename.get(filename).is_none(),
+ })
+ .collect();
+
+ let token = self.current_token().await?;
+ Ok((token, changes))
+ }
+
+ async fn current_token(&mut self) -> Result<Token> {
+ let davstate = self.davdag.state();
+ let heads = davstate.heads_vec();
+ let token = match heads.as_slice() {
+ [token] => *token,
+ _ => {
+ let op_mg = davstate.op_merge();
+ let token = op_mg.token();
+ self.davdag.push(op_mg).await?;
+ token
+ }
+ };
+ Ok(token)
+ }
+}