aboutsummaryrefslogtreecommitdiff
path: root/aero-collections/src/calendar/mod.rs
blob: 0e0e65f81b81c8dfd9d4c1c79c659d5776f96517 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
pub mod namespace;

use anyhow::{anyhow, Result};
use tokio::sync::RwLock;

use aero_bayou::Bayou;
use aero_user::login::Credentials;
use aero_user::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
use aero_user::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store};

use crate::unique_ident::*;
use crate::davdag::{DavDag, IndexEntry, Token, BlobId, SyncChange};

pub struct Calendar {
    pub(super) id: UniqueIdent,
    internal: RwLock<CalendarInternal>,
}

impl Calendar {
    pub(crate) async fn open(
        creds: &Credentials,
        id: UniqueIdent,
        ) -> Result<Self> {
        let bayou_path = format!("calendar/dag/{}", id);
        let cal_path = format!("calendar/events/{}", id);

        let mut davdag = Bayou::<DavDag>::new(creds, bayou_path).await?;
        davdag.sync().await?;

        let internal = RwLock::new(CalendarInternal {
            id,
            encryption_key: creds.keys.master.clone(),
            storage: creds.storage.build().await?,
            davdag,
            cal_path,
        });

        Ok(Self { id, internal })
    }

    // ---- DAG sync utilities

    /// Sync data with backing store
    pub async fn force_sync(&self) -> Result<()> {
        self.internal.write().await.force_sync().await
    }

    /// Sync data with backing store only if changes are detected
    /// or last sync is too old
    pub async fn opportunistic_sync(&self) -> Result<()> {
        self.internal.write().await.opportunistic_sync().await
    }

    // ---- Data API

    /// Access the DAG internal data (you can get the list of files for example)
    pub async fn dag(&self) -> DavDag {
        // Cloning is cheap
        self.internal.read().await.davdag.state().clone()
    }

    /// The diff API is a write API as we might need to push a merge node
    /// to get a new sync token
    pub async fn diff(&self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
        self.internal.write().await.diff(sync_token).await
    }

    /// Get a specific event
    pub async fn get(&self, evt_id: UniqueIdent) -> Result<Vec<u8>> {
        self.internal.read().await.get(evt_id).await
    }

    /// Put a specific event
    pub async fn put<'a>(&self, entry: IndexEntry, evt: &'a [u8]) -> Result<Token> {
        self.internal.write().await.put(entry, evt).await
    }

    /// Delete a specific event
    pub async fn delete(&self, blob_id: UniqueIdent) -> Result<Token> {
        self.internal.write().await.delete(blob_id).await
    }
}

use base64::Engine;
const MESSAGE_KEY: &str = "message-key";
struct CalendarInternal {
    #[allow(dead_code)]
    id: UniqueIdent,
    cal_path: String,
    encryption_key: Key,
    storage: Store,
    davdag: Bayou<DavDag>,
}

impl CalendarInternal {
    async fn force_sync(&mut self) -> Result<()> {
        self.davdag.sync().await?;
        Ok(())
    }

    async fn opportunistic_sync(&mut self) -> Result<()> {
        self.davdag.opportunistic_sync().await?;
        Ok(())
    }

    async fn get(&self, blob_id: BlobId) -> Result<Vec<u8>> {
        // Fetch message from S3
        let blob_ref = storage::BlobRef(format!("{}/{}", self.cal_path, blob_id));
        let object = self.storage.blob_fetch(&blob_ref).await?;

        // Decrypt message key from headers
        let key_encrypted_b64 = object
            .meta
            .get(MESSAGE_KEY)
            .ok_or(anyhow!("Missing key in metadata"))?;
        let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
        let message_key_raw = cryptoblob::open(&key_encrypted, &self.encryption_key)?;
        let message_key =
            cryptoblob::Key::from_slice(&message_key_raw).ok_or(anyhow!("Invalid message key"))?;

        // Decrypt body
        let body = object.value;
        cryptoblob::open(&body, &message_key)
    }

    async fn put<'a>(&mut self, entry: IndexEntry, evt: &'a [u8]) -> Result<Token> {
        let message_key = gen_key();
        
        let encrypted_msg_key = cryptoblob::seal(&message_key.as_ref(), &self.encryption_key)?;
        let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_msg_key);

        // Write event to S3
        let message_blob = cryptoblob::seal(evt, &message_key)?;
        let blob_val = BlobVal::new(
            BlobRef(format!("{}/{}", self.cal_path, entry.0)),
            message_blob,
        )
        .with_meta(MESSAGE_KEY.to_string(), key_header);

        self.storage
            .blob_insert(blob_val)
            .await?;

        // Add entry to Bayou
        let davstate = self.davdag.state();
        let put_op = davstate.op_put(entry);
        let token = put_op.token();
        self.davdag.push(put_op).await?;

        Ok(token)
    }

    async fn delete(&mut self, blob_id: BlobId) -> Result<Token> {
        todo!();
    }

    async fn diff(&mut self, sync_token: Token) -> Result<(Token, Vec<SyncChange>)> {
        todo!();
    }
}