aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-05-18 16:03:27 +0200
committerAlex Auvolat <alex@adnab.me>2022-05-18 16:03:27 +0200
commitaeec77ae444789b3bbfd9e9e6ef8c7d33eee48e1 (patch)
treef3ed5dcd3a8d0ffe6840c2c396100f7e098ddfb2
parentf3cddcb485978b0508caa7a0efeeed14a11fdd6b (diff)
downloadaerogramme-aeec77ae444789b3bbfd9e9e6ef8c7d33eee48e1.tar.gz
aerogramme-aeec77ae444789b3bbfd9e9e6ef8c7d33eee48e1.zip
Don't try to checkpoint as often
-rw-r--r--src/bayou.rs20
-rw-r--r--src/main.rs18
2 files changed, 37 insertions, 1 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 47a027a..ca5158e 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -53,6 +53,7 @@ pub struct Bayou<S: BayouState> {
checkpoint: (Timestamp, S),
history: Vec<(Timestamp, S::Op, Option<S>)>,
last_sync: Option<Instant>,
+ last_try_checkpoint: Option<Instant>,
}
impl<S: BayouState> Bayou<S> {
@@ -82,6 +83,7 @@ impl<S: BayouState> Bayou<S> {
checkpoint: (Timestamp::zero(), S::default()),
history: vec![],
last_sync: None,
+ last_try_checkpoint: None,
})
}
@@ -107,6 +109,8 @@ impl<S: BayouState> Bayou<S> {
let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
obj_body.into_async_read().read_to_end(&mut buf).await?;
+ eprintln!("(sync) checkpoint body length: {}", buf.len());
+
let ck = open_deserialize::<S>(&buf, &self.key)?;
(*ts, Some(ck))
}
@@ -250,6 +254,8 @@ impl<S: BayouState> Bayou<S> {
pub async fn push(&mut self, op: S::Op) -> Result<()> {
self.check_recent_sync().await?;
+ eprintln!("(push) add operation: {:?}", op);
+
let ts = Timestamp::after(
self.history
.last()
@@ -281,6 +287,19 @@ impl<S: BayouState> Bayou<S> {
/// Save a new checkpoint if previous checkpoint is too old
pub async fn checkpoint(&mut self) -> Result<()> {
+ match self.last_try_checkpoint {
+ Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 10 => Ok(()),
+ _ => {
+ let res = self.checkpoint_internal().await;
+ if res.is_ok() {
+ self.last_try_checkpoint = Some(Instant::now());
+ }
+ res
+ }
+ }
+ }
+
+ async fn checkpoint_internal(&mut self) -> Result<()> {
self.check_recent_sync().await?;
// Check what would be the possible time for a checkpoint in the history we have
@@ -347,6 +366,7 @@ impl<S: BayouState> Bayou<S> {
// Serialize and save checkpoint
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
+ eprintln!("(cp) checkpoint body length: {}", cryptoblob.len());
let mut por = PutObjectRequest::default();
por.bucket = self.bucket.clone();
diff --git a/src/main.rs b/src/main.rs
index 461cb11..7b46f01 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -47,7 +47,7 @@ async fn do_stuff() -> Result<()> {
dump(&uid_index);
let mut rand_id = [0u8; 24];
- rand_id[..8].copy_from_slice(&u64::to_be_bytes(thread_rng().gen()));
+ rand_id[..16].copy_from_slice(&u128::to_be_bytes(thread_rng().gen()));
let add_mail_op = uid_index
.state()
.op_mail_add(MailUuid(rand_id), vec!["\\Unseen".into()]);
@@ -55,6 +55,22 @@ async fn do_stuff() -> Result<()> {
dump(&uid_index);
+ if uid_index.state().mails_by_uid.len() > 6 {
+ for i in 0..2 {
+ let (_, uuid) = uid_index
+ .state()
+ .mails_by_uid
+ .iter()
+ .skip(3 + i)
+ .next()
+ .unwrap();
+ let del_mail_op = uid_index.state().op_mail_del(*uuid);
+ uid_index.push(del_mail_op).await?;
+
+ dump(&uid_index);
+ }
+ }
+
Ok(())
}