aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--aero-collections/src/calendar/mod.rs53
1 files changed, 46 insertions, 7 deletions
diff --git a/aero-collections/src/calendar/mod.rs b/aero-collections/src/calendar/mod.rs
index 936f8c3..7e5a8c1 100644
--- a/aero-collections/src/calendar/mod.rs
+++ b/aero-collections/src/calendar/mod.rs
@@ -38,6 +38,8 @@ impl Calendar {
Ok(Self { id, internal })
}
+ // ---- DAG sync utilities
+
/// Sync data with backing store
pub async fn force_sync(&self) -> Result<()> {
self.internal.write().await.force_sync().await
@@ -49,24 +51,40 @@ impl Calendar {
self.internal.write().await.opportunistic_sync().await
}
- pub async fn get(&self, blob_id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
- self.internal.read().await.get(blob_id, message_key).await
+ // ---- Data API
+
+ /// Access the DAG internal data (you can get the list of files for example)
+ pub async fn dag(&self) -> DavDag {
+ // Cloning is cheap
+ self.internal.read().await.davdag.state().clone()
}
+ /// The diff API is a write API as we might need to push a merge node
+ /// to get a new sync token
pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
- self.internal.read().await.diff(sync_token).await
+ self.internal.write().await.diff(sync_token).await
}
+ /// Get a specific event
+ pub async fn get(&self, evt_id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
+ self.internal.read().await.get(evt_id, message_key).await
+ }
+
+ /// Put a specific event
pub async fn put<'a>(&self, entry: IndexEntry, evt: &'a [u8]) -> Result<Token> {
self.internal.write().await.put(entry, evt).await
}
+ /// Delete a specific event
pub async fn delete(&self, blob_id: UniqueIdent) -> Result<Token> {
self.internal.write().await.delete(blob_id).await
}
}
+use base64::Engine;
+const MESSAGE_KEY: &str = "message-key";
struct CalendarInternal {
+ #[allow(dead_code)]
id: UniqueIdent,
cal_path: String,
encryption_key: Key,
@@ -90,16 +108,37 @@ impl CalendarInternal {
}
async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result<Token> {
- //@TODO write event to S3
- //@TODO add entry into Bayou
- todo!();
+ let message_key = gen_key();
+
+ let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?;
+ let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key);
+
+ // Write event to S3
+ let message_blob = cryptoblob::seal(evt, &message_key)?;
+ let blob_val = BlobVal::new(
+ BlobRef(format!("{}/{}", self.cal_path, entry.0)),
+ message_blob,
+ )
+ .with_meta(MESSAGE_KEY.to_string(), key_header);
+
+ self.storage
+ .blob_insert(blob_val)
+ .await?;
+
+ // Add entry to Bayou
+ let davstate = self.davdag.state();
+ let put_op = davstate.op_put(entry);
+ let token = put_op.token();
+ self.davdag.push(put_op).await?;
+
+ Ok(token)
}
async fn delete(&mut self, blob_id: BlobId) -> Result<Token> {
todo!();
}
- async fn diff(&self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
+ async fn diff(&mut self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
todo!();
}
}