diff options
author | Alex Auvolat <alex@adnab.me> | 2022-05-18 16:03:27 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-05-18 16:03:27 +0200 |
commit | aeec77ae444789b3bbfd9e9e6ef8c7d33eee48e1 (patch) | |
tree | f3ed5dcd3a8d0ffe6840c2c396100f7e098ddfb2 /src | |
parent | f3cddcb485978b0508caa7a0efeeed14a11fdd6b (diff) | |
download | aerogramme-aeec77ae444789b3bbfd9e9e6ef8c7d33eee48e1.tar.gz aerogramme-aeec77ae444789b3bbfd9e9e6ef8c7d33eee48e1.zip |
Don't try to checkpoint as often
Diffstat (limited to 'src')
-rw-r--r-- | src/bayou.rs | 20 | ||||
-rw-r--r-- | src/main.rs | 18 |
2 files changed, 37 insertions, 1 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index 47a027a..ca5158e 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -53,6 +53,7 @@ pub struct Bayou<S: BayouState> { checkpoint: (Timestamp, S), history: Vec<(Timestamp, S::Op, Option<S>)>, last_sync: Option<Instant>, + last_try_checkpoint: Option<Instant>, } impl<S: BayouState> Bayou<S> { @@ -82,6 +83,7 @@ impl<S: BayouState> Bayou<S> { checkpoint: (Timestamp::zero(), S::default()), history: vec![], last_sync: None, + last_try_checkpoint: None, }) } @@ -107,6 +109,8 @@ impl<S: BayouState> Bayou<S> { let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); obj_body.into_async_read().read_to_end(&mut buf).await?; + eprintln!("(sync) checkpoint body length: {}", buf.len()); + let ck = open_deserialize::<S>(&buf, &self.key)?; (*ts, Some(ck)) } @@ -250,6 +254,8 @@ impl<S: BayouState> Bayou<S> { pub async fn push(&mut self, op: S::Op) -> Result<()> { self.check_recent_sync().await?; + eprintln!("(push) add operation: {:?}", op); + let ts = Timestamp::after( self.history .last() @@ -281,6 +287,19 @@ impl<S: BayouState> Bayou<S> { /// Save a new checkpoint if previous checkpoint is too old pub async fn checkpoint(&mut self) -> Result<()> { + match self.last_try_checkpoint { + Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 10 => Ok(()), + _ => { + let res = self.checkpoint_internal().await; + if res.is_ok() { + self.last_try_checkpoint = Some(Instant::now()); + } + res + } + } + } + + async fn checkpoint_internal(&mut self) -> Result<()> { self.check_recent_sync().await?; // Check what would be the possible time for a checkpoint in the history we have @@ -347,6 +366,7 @@ impl<S: BayouState> Bayou<S> { // Serialize and save checkpoint let cryptoblob = seal_serialize(&state_cp, &self.key)?; + eprintln!("(cp) checkpoint body length: {}", cryptoblob.len()); let mut por = PutObjectRequest::default(); por.bucket = self.bucket.clone(); diff --git a/src/main.rs b/src/main.rs index 461cb11..7b46f01 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,7 +47,7 @@ async fn do_stuff() -> Result<()> { dump(&uid_index); let mut rand_id = [0u8; 24]; - rand_id[..8].copy_from_slice(&u64::to_be_bytes(thread_rng().gen())); + rand_id[..16].copy_from_slice(&u128::to_be_bytes(thread_rng().gen())); let add_mail_op = uid_index .state() .op_mail_add(MailUuid(rand_id), vec!["\\Unseen".into()]); @@ -55,6 +55,22 @@ async fn do_stuff() -> Result<()> { dump(&uid_index); + if uid_index.state().mails_by_uid.len() > 6 { + for i in 0..2 { + let (_, uuid) = uid_index + .state() + .mails_by_uid + .iter() + .skip(3 + i) + .next() + .unwrap(); + let del_mail_op = uid_index.state().op_mail_del(*uuid); + uid_index.push(del_mail_op).await?; + + dump(&uid_index); + } + } + Ok(()) } |