diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mail/incoming.rs | 6 | ||||
-rw-r--r-- | src/mail/user.rs | 34 | ||||
-rw-r--r-- | src/storage/garage.rs | 2 | ||||
-rw-r--r-- | src/storage/in_memory.rs | 2 | ||||
-rw-r--r-- | src/storage/mod.rs | 10 |
5 files changed, 28 insertions, 26 deletions
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index b7d2f48..3ea7d6a 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -54,10 +54,10 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.k2v_client()?, INCOMING_PK, INCOMING_LOCK_SK); + let mut lock_held = k2v_lock_loop(creds.row_client()?, INCOMING_PK, INCOMING_LOCK_SK); - let k2v = creds.k2v_client()?; - let s3 = creds.s3_client()?; + let k2v = creds.row_client()?; + let s3 = creds.blob_client()?; let mut inbox: Option<Arc<Mailbox>> = None; let mut prev_ct: Option<CausalityToken> = None; diff --git a/src/mail/user.rs b/src/mail/user.rs index 2104455..3b8d4e7 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -34,7 +34,7 @@ const MAILBOX_LIST_SK: &str = "list"; pub struct User { pub username: String, pub creds: Credentials, - pub k2v: K2vClient, + pub k2v: storage::RowStore, pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>, tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>, @@ -174,7 +174,7 @@ impl User { // ---- Internal user & mailbox management ---- async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> { - let k2v = creds.k2v_client()?; + let k2v = creds.row_client()?; let (tx_inbox_id, rx_inbox_id) = watch::channel(None); @@ -224,32 +224,32 @@ impl User { // ---- Mailbox list management ---- - async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<CausalityToken>)> { - let (mut list, ct) = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await { - Err(k2v_client::Error::NotFound) => (MailboxList::new(), None), + async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> { + let (mut list, row) = match self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK).fetch().await { + Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(e) => return Err(e.into()), - Ok(cv) => { + Ok(rv) => { let mut list = MailboxList::new(); - for v in cv.value { - if let K2vValue::Value(vbytes) = v { + for v in rv.content() { + if let storage::Alternative::Value(vbytes) = v { let list2 = open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?; list.merge(list2); } } - (list, Some(cv.causality)) + (list, Some(rv.to_ref())) } }; - self.ensure_inbox_exists(&mut list, &ct).await?; + self.ensure_inbox_exists(&mut list, &row).await?; - Ok((list, ct)) + Ok((list, row)) } async fn ensure_inbox_exists( &self, list: &mut MailboxList, - ct: &Option<CausalityToken>, + ct: &Option<storage::RowRef>, ) -> Result<bool> { // If INBOX doesn't exist, create a new mailbox with that name // and save new mailbox list. @@ -278,12 +278,14 @@ impl User { async fn save_mailbox_list( &self, list: &MailboxList, - ct: Option<CausalityToken>, + ct: Option<storage::RowRef>, ) -> Result<()> { let list_blob = seal_serialize(list, &self.creds.keys.master)?; - self.k2v - .insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct) - .await?; + let rref = match ct { + Some(x) => x, + None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK), + }; + rref.set_value(list_blob).push().await?; Ok(()) } } diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 6dea00c..595a57c 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -21,7 +21,7 @@ impl IBuilders for GrgCreds { } impl IRowStore for GrgStore { - fn new_row(&self, partition: &str, sort: &str) -> RowRef { + fn row(&self, partition: &str, sort: &str) -> RowRef { unimplemented!(); } } diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 5cc8ef8..19b55b9 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -22,7 +22,7 @@ impl IBuilders for FullMem { } impl IRowStore for MemStore { - fn new_row(&self, partition: &str, sort: &str) -> RowRef { + fn row(&self, partition: &str, sort: &str) -> RowRef { unimplemented!(); } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 0939463..3e66e84 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -77,7 +77,7 @@ impl Hash for Builders { // ------ Row pub trait IRowStore { - fn new_row(&self, partition: &str, sort: &str) -> RowRef; + fn row(&self, partition: &str, sort: &str) -> RowRef; } pub type RowStore = Box<dyn IRowStore + Sync + Send>; @@ -88,7 +88,7 @@ pub trait IRowRef fn rm(&self) -> AsyncResult<()>; fn poll(&self) -> AsyncResult<Option<RowValue>>; } -pub type RowRef = Box<dyn IRowRef>; +pub type RowRef = Box<dyn IRowRef + Send + Sync>; pub trait IRowValue { @@ -96,7 +96,7 @@ pub trait IRowValue fn content(&self) -> ConcurrentValues; fn push(&self) -> AsyncResult<()>; } -pub type RowValue = Box<dyn IRowValue>; +pub type RowValue = Box<dyn IRowValue + Send + Sync>; // ------- Blob pub trait IBlobStore @@ -113,10 +113,10 @@ pub trait IBlobRef fn copy(&self, dst: &BlobRef) -> AsyncResult<()>; fn rm(&self) -> AsyncResult<()>; } -pub type BlobRef = Box<dyn IBlobRef>; +pub type BlobRef = Box<dyn IBlobRef + Send + Sync>; pub trait IBlobValue { fn to_ref(&self) -> BlobRef; fn push(&self) -> AsyncResult<()>; } -pub type BlobValue = Box<dyn IBlobValue>; +pub type BlobValue = Box<dyn IBlobValue + Send + Sync>; |