diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bayou.rs | 236 | ||||
-rw-r--r-- | src/main.rs | 50 | ||||
-rw-r--r-- | src/uidindex.rs | 16 |
3 files changed, 274 insertions, 28 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index 1ea8395..457a666 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -1,22 +1,31 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; -use anyhow::Result; +use anyhow::{anyhow, bail, Result}; use rand::prelude::*; use serde::{Deserialize, Serialize}; +use tokio::io::AsyncReadExt; -use k2v_client::K2vClient; +use k2v_client::{BatchReadOp, Filter, K2vClient, K2vValue}; use rusoto_core::HttpClient; use rusoto_credential::{AwsCredentials, StaticProvider}; -use rusoto_s3::S3Client; +use rusoto_s3::{GetObjectRequest, ListObjectsV2Request, S3Client, S3}; use rusoto_signature::Region; -use crate::cryptoblob::Key; +use crate::cryptoblob::*; use crate::time::now_msec; +const SAVE_STATE_EVERY: usize = 64; + +// Checkpointing interval constants: a checkpoint is not made earlier +// than CHECKPOINT_INTERVAL time after the last one, and is not made +// if there are less than CHECKPOINT_MIN_OPS new operations since last one. +const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(3600); +const CHECKPOINT_MIN_OPS: usize = 16; + pub trait BayouState: Default + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { - type Op: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; + type Op: Clone + Serialize + for<'de> Deserialize<'de> + std::fmt::Debug + Send + Sync + 'static; fn apply(&self, op: &Self::Op) -> Self; } @@ -31,6 +40,7 @@ pub struct Bayou<S: BayouState> { checkpoint: (Timestamp, S), history: Vec<(Timestamp, S::Op, Option<S>)>, + last_sync: Option<Instant>, } impl<S: BayouState> Bayou<S> { @@ -59,23 +69,231 @@ impl<S: BayouState> Bayou<S> { s3: s3_client, checkpoint: (Timestamp::zero(), S::default()), history: vec![], + last_sync: None, }) } /// Re-reads the state from persistent storage backend pub async fn sync(&mut self) -> Result<()> { // 1. List checkpoints + let prefix = format!("{}/checkpoint/", self.path); + + let mut lor = ListObjectsV2Request::default(); + lor.bucket = self.bucket.clone(); + lor.max_keys = Some(1000); + lor.prefix = Some(prefix.clone()); + + let checkpoints_res = self.s3.list_objects_v2(lor).await?; + + let mut checkpoints = vec![]; + for object in checkpoints_res.contents.unwrap_or_default() { + if let Some(key) = object.key { + if let Some(ckid) = key.strip_prefix(&prefix) { + if let Some(ts) = Timestamp::parse(ckid) { + checkpoints.push((ts, key)); + } + } + } + } + checkpoints.sort_by_key(|(ts, _)| *ts); + eprintln!("(sync) listed checkpoints: {:?}", checkpoints); + // 2. Load last checkpoint if different from currently used one + let checkpoint = if let Some((ts, key)) = checkpoints.last() { + if *ts == self.checkpoint.0 { + (*ts, None) + } else { + eprintln!("(sync) loading checkpoint: {}", key); + + let mut gor = GetObjectRequest::default(); + gor.bucket = self.bucket.clone(); + gor.key = key.to_string(); + let obj_res = self.s3.get_object(gor).await?; + + let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; + let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); + obj_body.into_async_read().read_to_end(&mut buf).await?; + + let ck = open_deserialize::<S>(&buf, &self.key)?; + (*ts, Some(ck)) + } + } else { + (Timestamp::zero(), None) + }; + + if self.checkpoint.0 > checkpoint.0 { + bail!("Existing checkpoint is more recent than stored one"); + } + + if let Some(ck) = checkpoint.1 { + eprintln!( + "(sync) updating checkpoint to loaded state at {:?}", + checkpoint.0 + ); + self.checkpoint = (checkpoint.0, ck); + }; + + // remove from history events before checkpoint + self.history = std::mem::take(&mut self.history) + .into_iter() + .skip_while(|(ts, _, _)| *ts < self.checkpoint.0) + .collect(); + // 3. List all operations starting from checkpoint + let ts_ser = self.checkpoint.0.serialize(); + eprintln!("(sync) looking up operations starting at {}", ts_ser); + let ops_map = self + .k2v + .read_batch(&[BatchReadOp { + partition_key: &self.path, + filter: Filter { + start: Some(&ts_ser), + end: None, + prefix: None, + limit: None, + reverse: false, + }, + single_item: false, + conflicts_only: false, + include_tombstones: false, + }]) + .await? + .into_iter() + .next() + .ok_or(anyhow!("Missing K2V result"))? + .items; + + let mut ops = vec![]; + for (tsstr, val) in ops_map { + let ts = Timestamp::parse(&tsstr) + .ok_or(anyhow!("Invalid operation timestamp: {}", tsstr))?; + if val.value.len() != 1 { + bail!("Invalid operation, has {} values", val.value.len()); + } + match &val.value[0] { + K2vValue::Value(v) => { + let op = open_deserialize::<S::Op>(&v, &self.key)?; + eprintln!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op); + ops.push((ts, op)); + } + K2vValue::Tombstone => { + unreachable!(); + } + } + } + ops.sort_by_key(|(ts, _)| *ts); + eprintln!("(sync) {} operations", ops.len()); + + // if no operations, clean up and return now + if ops.is_empty() { + self.history.clear(); + return Ok(()); + } + // 4. Check that first operation has same timestamp as checkpoint (if not zero) + if self.checkpoint.0 != Timestamp::zero() && ops[0].0 != self.checkpoint.0 { + bail!( + "First operation in listing doesn't have timestamp that corresponds to checkpoint" + ); + } + // 5. Apply all operations in order - unimplemented!() + // Hypothesis: before the loaded checkpoint, operations haven't changed + // between what's on storage and what we used to calculate the state in RAM here. + let i0 = self + .history + .iter() + .enumerate() + .zip(ops.iter()) + .skip_while(|((i, (ts1, _, _)), (ts2, _))| ts1 == ts2) + .map(|((i, _), _)| i) + .next() + .unwrap_or(self.history.len()); + + if ops.len() > i0 { + // Remove operations from first position where histories differ + self.history.truncate(i0); + + // Look up last calculated state which we have saved and start from there. + let mut last_state = (0, &self.checkpoint.1); + for (i, (_, _, state_opt)) in self.history.iter().enumerate().rev() { + if let Some(state) = state_opt { + last_state = (i + 1, state); + break; + } + } + + // Calculate state at the end of this common part of the history + let mut state = last_state.1.clone(); + for (_, op, _) in self.history[last_state.0..].iter() { + state = state.apply(op); + } + + // Now, apply all operations retrieved from storage after the common part + for (ts, op) in ops.drain(i0..) { + state = state.apply(&op); + if (self.history.len() + 1) % SAVE_STATE_EVERY == 0 { + self.history.push((ts, op, Some(state.clone()))); + } else { + self.history.push((ts, op, None)); + } + } + + // Always save final state as result of last operation + self.history.last_mut().unwrap().2 = Some(state); + } + + self.last_sync = Some(Instant::now()); + Ok(()) + } + + async fn check_recent_sync(&mut self) -> Result<()> { + match self.last_sync { + Some(t) if (Instant::now() - t) < CHECKPOINT_INTERVAL / 10 => Ok(()), + _ => self.sync().await, + } } /// Applies a new operation on the state. Once this function returns, /// the option has been safely persisted to storage backend pub async fn push(&mut self, op: S::Op) -> Result<()> { - unimplemented!() + self.check_recent_sync().await?; + + let ts = Timestamp::after( + self.history + .last() + .map(|(ts, _, _)| ts) + .unwrap_or(&self.checkpoint.0), + ); + self.k2v + .insert_item( + &self.path, + &ts.serialize(), + seal_serialize(&op, &self.key)?, + None, + ) + .await?; + + let new_state = self.state().apply(&op); + self.history.push((ts, op, Some(new_state))); + + // Clear previously saved state in history if not required + let hlen = self.history.len(); + if hlen >= 2 && (hlen - 1) % SAVE_STATE_EVERY != 0 { + self.history[hlen - 2].2 = None; + } + + self.checkpoint().await?; + + Ok(()) + } + + /// Save a new checkpoint if previous checkpoint is too old + pub async fn checkpoint(&mut self) -> Result<()> { + self.check_recent_sync().await?; + + eprintln!("Mock checkpointing, not implemented"); + Ok(()) } pub fn state(&self) -> &S { @@ -87,7 +305,7 @@ impl<S: BayouState> Bayou<S> { } } -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] pub struct Timestamp { pub msec: u64, pub rand: u64, diff --git a/src/main.rs b/src/main.rs index 81cf220..461cb11 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,14 @@ -use anyhow::Result; - -use rusoto_credential::{EnvironmentProvider, ProvideAwsCredentials}; -use rusoto_signature::Region; - mod bayou; mod cryptoblob; mod time; mod uidindex; +use anyhow::Result; + +use rand::prelude::*; +use rusoto_credential::{EnvironmentProvider, ProvideAwsCredentials}; +use rusoto_signature::Region; + use bayou::*; use cryptoblob::Key; use uidindex::*; @@ -32,21 +33,48 @@ async fn do_stuff() -> Result<()> { let key = Key::from_slice(&[0u8; 32]).unwrap(); - let mut mail_index = Bayou::<UidIndex>::new( + let mut uid_index = Bayou::<UidIndex>::new( creds, k2v_region, s3_region, - "alex".into(), + "mail".into(), "TestMailbox".into(), key, )?; - mail_index.sync().await?; + uid_index.sync().await?; + + dump(&uid_index); - let add_mail_op = mail_index + let mut rand_id = [0u8; 24]; + rand_id[..8].copy_from_slice(&u64::to_be_bytes(thread_rng().gen())); + let add_mail_op = uid_index .state() - .op_mail_add(MailUuid([0xFFu8; 24]), vec!["\\Unseen".into()]); - mail_index.push(add_mail_op).await?; + .op_mail_add(MailUuid(rand_id), vec!["\\Unseen".into()]); + uid_index.push(add_mail_op).await?; + + dump(&uid_index); Ok(()) } + +fn dump(uid_index: &Bayou<UidIndex>) { + let s = uid_index.state(); + println!("---- MAILBOX STATE ----"); + println!("UIDVALIDITY {}", s.uidvalidity); + println!("UIDNEXT {}", s.uidnext); + println!("INTERNALSEQ {}", s.internalseq); + for (uid, uuid) in s.mails_by_uid.iter() { + println!( + "{} {} {}", + uid, + hex::encode(uuid.0), + s.mail_flags + .get(uuid) + .cloned() + .unwrap_or_default() + .join(", ") + ); + } + println!(""); +} diff --git a/src/uidindex.rs b/src/uidindex.rs index 600cf6a..1e30190 100644 --- a/src/uidindex.rs +++ b/src/uidindex.rs @@ -9,22 +9,22 @@ type ImapUidvalidity = u32; /// A Mail UUID is composed of two components: /// - a process identifier, 128 bits /// - a sequence number, 64 bits -#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq)] +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Debug)] pub struct MailUuid(pub [u8; 24]); #[derive(Clone)] pub struct UidIndex { - mail_uid: OrdMap<MailUuid, ImapUid>, - mail_flags: OrdMap<MailUuid, Vec<String>>, + pub mail_uid: OrdMap<MailUuid, ImapUid>, + pub mail_flags: OrdMap<MailUuid, Vec<String>>, - mails_by_uid: OrdMap<ImapUid, MailUuid>, + pub mails_by_uid: OrdMap<ImapUid, MailUuid>, - uidvalidity: ImapUidvalidity, - uidnext: ImapUid, - internalseq: ImapUid, + pub uidvalidity: ImapUidvalidity, + pub uidnext: ImapUid, + pub internalseq: ImapUid, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize, Debug)] pub enum UidIndexOp { MailAdd(MailUuid, ImapUid, Vec<String>), MailDel(MailUuid), |