aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bayou.rs8
-rw-r--r--src/mail/user.rs41
-rw-r--r--src/storage/garage.rs9
-rw-r--r--src/storage/in_memory.rs15
-rw-r--r--src/storage/mod.rs14
5 files changed, 66 insertions, 21 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index f95bd82..d3027c5 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -443,7 +443,7 @@ impl<S: BayouState> Bayou<S> {
struct K2vWatch {
pk: String,
sk: String,
- rx: watch::Receiver<storage::RowRef>,
+ rx: watch::Receiver<storage::OrphanRowRef>,
notify: Notify,
}
@@ -454,7 +454,7 @@ impl K2vWatch {
fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> {
let row_client = creds.row_client()?;
- let (tx, rx) = watch::channel::<storage::RowRef>(row_client.row(&pk, &sk));
+ let (tx, rx) = watch::channel::<storage::OrphanRowRef>(row_client.row(&pk, &sk).to_orphan());
let notify = Notify::new();
let watch = Arc::new(K2vWatch { pk, sk, rx, notify });
@@ -471,7 +471,7 @@ impl K2vWatch {
async fn background_task(
self_weak: Weak<Self>,
k2v: storage::RowStore,
- tx: watch::Sender<storage::RowRef>,
+ tx: watch::Sender<storage::OrphanRowRef>,
) {
let mut row = match Weak::upgrade(&self_weak) {
Some(this) => k2v.row(&this.pk, &this.sk),
@@ -497,7 +497,7 @@ impl K2vWatch {
}
Ok(new_value) => {
row = new_value.to_ref();
- if tx.send(XXX).is_err() {
+ if tx.send(row.to_orphan()).is_err() {
break;
}
}
diff --git a/src/mail/user.rs b/src/mail/user.rs
index 6d3bc1a..7011dcc 100644
--- a/src/mail/user.rs
+++ b/src/mail/user.rs
@@ -81,7 +81,11 @@ impl User {
let mb_uidvalidity = mb.current_uid_index().await.uidvalidity;
if mb_uidvalidity > uidvalidity {
list.update_uidvalidity(name, mb_uidvalidity);
- self.save_mailbox_list(&list, ct).await?;
+ let orphan = match ct {
+ Some(x) => Some(x.to_orphan()),
+ None => None,
+ };
+ self.save_mailbox_list(&list, orphan).await?;
}
Ok(Some(mb))
} else {
@@ -104,7 +108,11 @@ impl User {
let (mut list, ct) = self.load_mailbox_list().await?;
match list.create_mailbox(name) {
CreatedMailbox::Created(_, _) => {
- self.save_mailbox_list(&list, ct).await?;
+ let orphan = match ct {
+ Some(x) => Some(x.to_orphan()),
+ None => None,
+ };
+ self.save_mailbox_list(&list, orphan).await?;
Ok(())
}
CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)),
@@ -121,7 +129,11 @@ impl User {
if list.has_mailbox(name) {
// TODO: actually delete mailbox contents
list.set_mailbox(name, None);
- self.save_mailbox_list(&list, ct).await?;
+ let orphan = match ct {
+ Some(x) => Some(x.to_orphan()),
+ None => None,
+ };
+ self.save_mailbox_list(&list, orphan).await?;
Ok(())
} else {
bail!("Mailbox {} does not exist", name);
@@ -142,7 +154,11 @@ impl User {
if old_name == INBOX {
list.rename_mailbox(old_name, new_name)?;
if !self.ensure_inbox_exists(&mut list, &ct).await? {
- self.save_mailbox_list(&list, ct).await?;
+ let orphan = match ct {
+ Some(x) => Some(x.to_orphan()),
+ None => None,
+ };
+ self.save_mailbox_list(&list, orphan).await?;
}
} else {
let names = list.existing_mailbox_names();
@@ -165,7 +181,12 @@ impl User {
list.rename_mailbox(name, &nnew)?;
}
}
- self.save_mailbox_list(&list, ct).await?;
+
+ let orphan = match ct {
+ Some(x) => Some(x.to_orphan()),
+ None => None,
+ };
+ self.save_mailbox_list(&list, orphan).await?;
}
Ok(())
}
@@ -257,7 +278,11 @@ impl User {
let saved;
let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) {
CreatedMailbox::Created(i, v) => {
- self.save_mailbox_list(list, ct.clone()).await?;
+ let orphan = match ct {
+ Some(x) => Some(x.to_orphan()),
+ None => None,
+ };
+ self.save_mailbox_list(list, orphan).await?;
saved = true;
(i, v)
}
@@ -277,11 +302,11 @@ impl User {
async fn save_mailbox_list(
&self,
list: &MailboxList,
- ct: Option<storage::RowRef>,
+ ct: Option<storage::OrphanRowRef>,
) -> Result<()> {
let list_blob = seal_serialize(list, &self.creds.keys.master)?;
let rref = match ct {
- Some(x) => x,
+ Some(x) => self.k2v.from_orphan(x),
None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK),
};
rref.set_value(list_blob).push().await?;
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index aef9a0d..d6ac7ac 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -6,6 +6,9 @@ pub struct GrgStore {}
pub struct GrgRef {}
pub struct GrgValue {}
+#[derive(Clone, Debug)]
+pub struct GrgOrphanRowRef {}
+
impl IBuilders for GrgCreds {
fn row_store(&self) -> Result<RowStore, StorageError> {
unimplemented!();
@@ -32,13 +35,17 @@ impl IRowStore for GrgStore {
fn rm(&self, selector: Selector) -> AsyncResult<()> {
unimplemented!();
}
+
+ fn from_orphan(&self, orphan: OrphanRowRef) -> RowRef {
+ unimplemented!();
+ }
}
impl IRowRef for GrgRef {
/*fn clone_boxed(&self) -> RowRef {
unimplemented!();
}*/
- fn to_orphan(&self) -> RowRefOrphan {
+ fn to_orphan(&self) -> OrphanRowRef {
unimplemented!()
}
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
index a4436e6..0bdf9b1 100644
--- a/src/storage/in_memory.rs
+++ b/src/storage/in_memory.rs
@@ -7,6 +7,9 @@ pub struct MemStore {}
pub struct MemRef {}
pub struct MemValue {}
+#[derive(Clone, Debug)]
+pub struct MemOrphanRowRef {}
+
impl IBuilders for FullMem {
fn row_store(&self) -> Result<RowStore, StorageError> {
unimplemented!();
@@ -33,16 +36,24 @@ impl IRowStore for MemStore {
fn rm(&self, selector: Selector) -> AsyncResult<()> {
unimplemented!();
}
+
+ fn from_orphan(&self, orphan: OrphanRowRef) -> RowRef {
+ unimplemented!();
+ }
}
impl IRowRef for MemRef {
+ fn to_orphan(&self) -> OrphanRowRef {
+ unimplemented!()
+ }
+
fn key(&self) -> (&str, &str) {
unimplemented!();
}
- fn clone_boxed(&self) -> RowRef {
+ /*fn clone_boxed(&self) -> RowRef {
unimplemented!();
- }
+ }*/
fn set_value(&self, content: Vec<u8>) -> RowValue {
unimplemented!();
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index 2e3c0ee..c9a49c5 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -20,6 +20,12 @@ pub enum Alternative {
}
type ConcurrentValues = Vec<Alternative>;
+#[derive(Clone, Debug)]
+pub enum OrphanRowRef {
+ Garage(garage::GrgOrphanRowRef),
+ Memory(in_memory::MemOrphanRowRef),
+}
+
pub enum Selector<'a> {
Range { shard_key: &'a str, begin: &'a str, end: &'a str },
List (Vec<(&'a str, &'a str)>), // list of (shard_key, sort_key)
@@ -81,13 +87,14 @@ pub trait IRowStore
fn row(&self, partition: &str, sort: &str) -> RowRef;
fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>>;
fn rm(&self, selector: Selector) -> AsyncResult<()>;
+ fn from_orphan(&self, orphan: OrphanRowRef) -> RowRef;
}
pub type RowStore = Box<dyn IRowStore + Sync + Send>;
pub trait IRowRef
{
/*fn clone_boxed(&self) -> RowRef;*/
- fn to_orphan(&self) -> RowRefOrphan;
+ fn to_orphan(&self) -> OrphanRowRef;
fn key(&self) -> (&str, &str);
fn set_value(&self, content: Vec<u8>) -> RowValue;
fn fetch(&self) -> AsyncResult<RowValue>;
@@ -101,11 +108,6 @@ pub type RowRef = Box<dyn IRowRef + Send + Sync>;
}
}*/
-pub trait IRowRefOrphan
-{
- fn attach(&self, store: &RowStore) -> RowRef;
-}
-pub type RowRefOrphan = Box<dyn IRowRefOrphan + Send + Sync>;
pub trait IRowValue
{