From 7a3ce9f81963cc374271272bfe4e0e204e9b7012 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 18 May 2022 12:24:37 +0200 Subject: Skeleton for some stuff --- src/bayou.rs | 123 +++++++++++++++++++++++++++++++++++++++ src/cryptoblob.rs | 58 +++++++++++++++++++ src/main.rs | 47 +++++++++++++++ src/time.rs | 9 +++ src/uidindex.rs | 168 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 405 insertions(+) create mode 100644 src/bayou.rs create mode 100644 src/cryptoblob.rs create mode 100644 src/main.rs create mode 100644 src/time.rs create mode 100644 src/uidindex.rs (limited to 'src') diff --git a/src/bayou.rs b/src/bayou.rs new file mode 100644 index 0000000..8fc711e --- /dev/null +++ b/src/bayou.rs @@ -0,0 +1,123 @@ +use std::time::Duration; + +use anyhow::Result; +use rand::prelude::*; +use serde::{Deserialize, Serialize}; + +use k2v_client::K2vClient; +use rusoto_core::HttpClient; +use rusoto_credential::{AwsCredentials, StaticProvider}; +use rusoto_s3::S3Client; +use rusoto_signature::Region; + +use crate::cryptoblob::Key; +use crate::time::now_msec; + +pub trait BayouState: + Default + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static +{ + type Op: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; + + fn apply(&self, op: &Self::Op) -> Self; +} + +pub struct Bayou { + bucket: String, + path: String, + key: Key, + + k2v: K2vClient, + s3: S3Client, + + checkpoint: (Timestamp, S), + history: Vec<(Timestamp, S::Op, Option)>, +} + +impl Bayou { + pub fn new( + creds: AwsCredentials, + k2v_region: Region, + s3_region: Region, + bucket: String, + path: String, + key: Key, + ) -> Result { + let k2v_client = K2vClient::new(k2v_region, bucket.clone(), creds.clone(), None)?; + let static_creds = StaticProvider::new( + creds.aws_access_key_id().to_string(), + creds.aws_secret_access_key().to_string(), + creds.token().clone(), + None, + ); + let s3_client = S3Client::new_with(HttpClient::new()?, static_creds, s3_region); + + Ok(Self { + bucket, + path, + key, + k2v: k2v_client, + s3: s3_client, + checkpoint: (Timestamp::zero(), S::default()), + history: vec![], + }) + } + + pub async fn sync(&mut self) -> Result<()> { + // 1. List checkpoints + // 2. Load last checkpoint + // 3. List all operations starting from checkpoint + // 4. Check that first operation has same timestamp as checkpoint (if not zero) + // 5. Apply all operations in order + unimplemented!() + } + + pub fn state(&self) -> &S { + unimplemented!() + } +} + +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct Timestamp { + pub msec: u64, + pub rand: u64, +} + +impl Timestamp { + pub fn now() -> Self { + let mut rng = thread_rng(); + Self { + msec: now_msec(), + rand: rng.gen::(), + } + } + + pub fn after(other: &Self) -> Self { + let mut rng = thread_rng(); + Self { + msec: std::cmp::max(now_msec(), other.msec + 1), + rand: rng.gen::(), + } + } + + pub fn zero() -> Self { + Self { msec: 0, rand: 0 } + } + + pub fn serialize(&self) -> String { + let mut bytes = [0u8; 16]; + bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec)); + bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand)); + hex::encode(&bytes) + } + + pub fn parse(v: &str) -> Option { + let bytes = hex::decode(v).ok()?; + if bytes.len() != 16 { + return None; + } + Some(Self { + msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()), + rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), + }) + } +} diff --git a/src/cryptoblob.rs b/src/cryptoblob.rs new file mode 100644 index 0000000..6e51dbb --- /dev/null +++ b/src/cryptoblob.rs @@ -0,0 +1,58 @@ +//! Helper functions for secret-key encrypted blobs +//! that contain Zstd encrypted data + +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; +use zstd::stream::{decode_all as zstd_decode, encode_all as zstd_encode}; + +use sodiumoxide::crypto::secretbox::xsalsa20poly1305::{self, gen_nonce, Nonce, NONCEBYTES}; +pub use sodiumoxide::crypto::secretbox::xsalsa20poly1305::{gen_key, Key, KEYBYTES}; + +pub fn open(cryptoblob: &[u8], key: &Key) -> Result> { + if cryptoblob.len() < NONCEBYTES { + return Err(anyhow!("Cyphertext too short")); + } + + // Decrypt -> get Zstd data + let nonce = Nonce::from_slice(&cryptoblob[..NONCEBYTES]).unwrap(); + let zstdblob = xsalsa20poly1305::open(&cryptoblob[NONCEBYTES..], &nonce, key) + .map_err(|_| anyhow!("Could not decrypt blob"))?; + + // Decompress zstd data + let mut reader = &zstdblob[..]; + let data = zstd_decode(&mut reader)?; + + Ok(data) +} + +pub fn seal(plainblob: &[u8], key: &Key) -> Result> { + // Compress data using zstd + let mut reader = &plainblob[..]; + let zstdblob = zstd_encode(&mut reader, 0)?; + + // Encrypt + let nonce = gen_nonce(); + let cryptoblob = xsalsa20poly1305::seal(&zstdblob, &nonce, key); + + let mut res = Vec::with_capacity(NONCEBYTES + cryptoblob.len()); + res.extend(nonce.as_ref()); + res.extend(cryptoblob); + + Ok(res) +} + +pub fn open_deserialize Deserialize<'de>>(cryptoblob: &[u8], key: &Key) -> Result { + let blob = open(cryptoblob, key)?; + + Ok(rmp_serde::decode::from_read_ref::<_, T>(&blob)?) +} + +pub fn seal_serialize(obj: T, key: &Key) -> Result> { + let mut wr = Vec::with_capacity(128); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + obj.serialize(&mut se)?; + + Ok(seal(&wr, key)?) +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..2b17e4c --- /dev/null +++ b/src/main.rs @@ -0,0 +1,47 @@ +use anyhow::Result; + +use rusoto_credential::{EnvironmentProvider, ProvideAwsCredentials}; +use rusoto_signature::Region; + +mod bayou; +mod cryptoblob; +mod time; +mod uidindex; + +use bayou::Bayou; +use cryptoblob::Key; +use uidindex::{UidIndex, UidIndexOp}; + +#[tokio::main] +async fn main() { + do_stuff().await.expect("Something failed"); +} + +async fn do_stuff() -> Result<()> { + let creds = EnvironmentProvider::default().credentials().await.unwrap(); + + let k2v_region = Region::Custom { + name: "garage-staging".to_owned(), + endpoint: "https://k2v-staging.home.adnab.me".to_owned(), + }; + + let s3_region = Region::Custom { + name: "garage-staging".to_owned(), + endpoint: "https://garage-staging.home.adnab.me".to_owned(), + }; + + let key = Key::from_slice(&[0u8; 32]).unwrap(); + + let mut mail_index = Bayou::::new( + creds, + k2v_region, + s3_region, + "alex".into(), + "TestMailbox".into(), + key, + )?; + + mail_index.sync().await?; + + Ok(()) +} diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 0000000..d34ee22 --- /dev/null +++ b/src/time.rs @@ -0,0 +1,9 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Returns milliseconds since UNIX Epoch +pub fn now_msec() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Fix your clock :o") + .as_millis() as u64 +} diff --git a/src/uidindex.rs b/src/uidindex.rs new file mode 100644 index 0000000..7c5500f --- /dev/null +++ b/src/uidindex.rs @@ -0,0 +1,168 @@ +use im::OrdMap; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; + +use crate::bayou::*; + +type ImapUid = u32; +type ImapUidvalidity = u32; + +/// A Mail UUID is composed of two components: +/// - a process identifier, 128 bits +/// - a sequence number, 64 bits +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq)] +pub struct MailUuid(pub [u8; 24]); + +#[derive(Clone)] +pub struct UidIndex { + mail_uid: OrdMap, + mail_flags: OrdMap>, + + mails_by_uid: OrdMap, + + uidvalidity: ImapUidvalidity, + uidnext: ImapUid, + internalseq: ImapUid, +} + +#[derive(Clone, Serialize, Deserialize)] +pub enum UidIndexOp { + MailAdd(MailUuid, ImapUid, Vec), + MailDel(MailUuid), +} + +impl Default for UidIndex { + fn default() -> Self { + Self { + mail_flags: OrdMap::new(), + mail_uid: OrdMap::new(), + mails_by_uid: OrdMap::new(), + uidvalidity: 1, + uidnext: 1, + internalseq: 1, + } + } +} + +impl BayouState for UidIndex { + type Op = UidIndexOp; + + fn apply(&self, op: &UidIndexOp) -> Self { + let mut new = self.clone(); + match op { + UidIndexOp::MailAdd(uuid, uid, flags) => { + if *uid < new.internalseq { + new.uidvalidity += new.internalseq - *uid; + } + let new_uid = new.internalseq; + + if let Some(prev_uid) = new.mail_uid.get(uuid) { + new.mails_by_uid.remove(prev_uid); + } else { + new.mail_flags.insert(*uuid, flags.clone()); + } + new.mails_by_uid.insert(new_uid, *uuid); + new.mail_uid.insert(*uuid, new_uid); + + new.internalseq += 1; + new.uidnext = new.internalseq; + } + UidIndexOp::MailDel(uuid) => { + if let Some(uid) = new.mail_uid.get(uuid) { + new.mails_by_uid.remove(uid); + new.mail_uid.remove(uuid); + new.mail_flags.remove(uuid); + } + new.internalseq += 1; + } + } + new + } +} + +// ---- CUSTOM SERIALIZATION AND DESERIALIZATION ---- + +#[derive(Serialize, Deserialize)] +struct UidIndexSerializedRepr { + mails: Vec<(ImapUid, MailUuid, Vec)>, + uidvalidity: ImapUidvalidity, + uidnext: ImapUid, + internalseq: ImapUid, +} + +impl<'de> Deserialize<'de> for UidIndex { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let val: UidIndexSerializedRepr = UidIndexSerializedRepr::deserialize(d)?; + + let mut uidindex = UidIndex { + mail_flags: OrdMap::new(), + mail_uid: OrdMap::new(), + mails_by_uid: OrdMap::new(), + uidvalidity: val.uidvalidity, + uidnext: val.uidnext, + internalseq: val.internalseq, + }; + + for (uid, uuid, flags) in val.mails { + uidindex.mail_flags.insert(uuid, flags); + uidindex.mail_uid.insert(uuid, uid); + uidindex.mails_by_uid.insert(uid, uuid); + } + + Ok(uidindex) + } +} + +impl Serialize for UidIndex { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut mails = vec![]; + for (uid, uuid) in self.mails_by_uid.iter() { + mails.push(( + *uid, + *uuid, + self.mail_flags.get(uuid).cloned().unwrap_or_default(), + )); + } + + let val = UidIndexSerializedRepr { + mails, + uidvalidity: self.uidvalidity, + uidnext: self.uidnext, + internalseq: self.internalseq, + }; + + val.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for MailUuid { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let v = String::deserialize(d)?; + let bytes = hex::decode(v).map_err(|_| D::Error::custom("invalid hex"))?; + + if bytes.len() != 24 { + return Err(D::Error::custom("bad length")); + } + + let mut tmp = [0u8; 24]; + tmp[..].copy_from_slice(&bytes); + Ok(Self(tmp)) + } +} + +impl Serialize for MailUuid { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&hex::encode(self.0)) + } +} -- cgit v1.2.3