From 916b27d87ec7f5bff41f9dd888914d50ae067fc0 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 15 Nov 2023 15:56:43 +0100 Subject: WIP refactor storage (new timestamp.rs file) --- src/bayou.rs | 94 +++++++----------------------------------------- src/mail/incoming.rs | 3 +- src/mail/mailbox.rs | 4 +-- src/mail/unique_ident.rs | 2 +- src/mail/user.rs | 3 +- src/main.rs | 2 +- src/storage/garage.rs | 8 +++++ src/storage/in_memory.rs | 8 +++++ src/storage/mod.rs | 3 +- src/time.rs | 9 ----- src/timestamp.rs | 65 +++++++++++++++++++++++++++++++++ 11 files changed, 101 insertions(+), 100 deletions(-) delete mode 100644 src/time.rs create mode 100644 src/timestamp.rs (limited to 'src') diff --git a/src/bayou.rs b/src/bayou.rs index 9f70017..3201783 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -15,9 +15,9 @@ use rusoto_s3::{ }; use crate::cryptoblob::*; -use crate::k2v_util::k2v_wait_value_changed; use crate::login::Credentials; -use crate::time::now_msec; +use crate::timestamp::*; +use crate::storage; const KEEP_STATE_EVERY: usize = 64; @@ -48,12 +48,11 @@ pub trait BayouState: } pub struct Bayou { - bucket: String, path: String, key: Key, - k2v: K2vClient, - s3: S3Client, + k2v: storage::RowStore, + s3: storage::BlobStore, checkpoint: (Timestamp, S), history: Vec<(Timestamp, S::Op, Option)>, @@ -67,13 +66,12 @@ pub struct Bayou { impl Bayou { pub fn new(creds: &Credentials, path: String) -> Result { - let k2v_client = creds.k2v_client()?; - let s3_client = creds.s3_client()?; + let k2v_client = creds.row_client()?; + let s3_client = creds.blob_client()?; let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?; Ok(Self { - bucket: creds.bucket().to_string(), path, key: creds.keys.master.clone(), k2v: k2v_client, @@ -103,17 +101,8 @@ impl Bayou { } else { debug!("(sync) loading checkpoint: {}", key); - let gor = GetObjectRequest { - bucket: self.bucket.clone(), - key: key.to_string(), - ..Default::default() - }; - - let obj_res = self.s3.get_object(gor).await?; - - let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; - let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); - obj_body.into_async_read().read_to_end(&mut buf).await?; + let obj_res = self.s3.blob(key).fetch().await?; + let buf = obj_res.content().ok_or(anyhow!("object can't be empty"))?; debug!("(sync) checkpoint body length: {}", buf.len()); @@ -145,7 +134,8 @@ impl Bayou { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); - let ops_map = self + let ops_map = self.k2v.select(storage::Selector::Range { begin: &ts_ser, end: WATCH_SK }).await?; + /*let ops_map = self .k2v .read_batch(&[BatchReadOp { partition_key: &self.path, @@ -164,13 +154,11 @@ impl Bayou { .into_iter() .next() .ok_or(anyhow!("Missing K2V result"))? - .items; + .items;*/ let mut ops = vec![]; - for (tsstr, val) in ops_map { - let ts = tsstr - .parse::() - .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?; + for row_value in ops_map { + let ts = row_value.timestamp(); if val.value.len() != 1 { bail!("Invalid operation, has {} values", val.value.len()); } @@ -536,59 +524,3 @@ impl K2vWatch { info!("bayou k2v watch bg loop exiting"); } } - -// ---- TIMESTAMP CLASS ---- - -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] -pub struct Timestamp { - pub msec: u64, - pub rand: u64, -} - -impl Timestamp { - #[allow(dead_code)] - // 2023-05-15 try to make clippy happy and not sure if this fn will be used in the future. - pub fn now() -> Self { - let mut rng = thread_rng(); - Self { - msec: now_msec(), - rand: rng.gen::(), - } - } - - pub fn after(other: &Self) -> Self { - let mut rng = thread_rng(); - Self { - msec: std::cmp::max(now_msec(), other.msec + 1), - rand: rng.gen::(), - } - } - - pub fn zero() -> Self { - Self { msec: 0, rand: 0 } - } -} - -impl ToString for Timestamp { - fn to_string(&self) -> String { - let mut bytes = [0u8; 16]; - bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec)); - bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand)); - hex::encode(bytes) - } -} - -impl FromStr for Timestamp { - type Err = &'static str; - - fn from_str(s: &str) -> Result { - let bytes = hex::decode(s).map_err(|_| "invalid hex")?; - if bytes.len() != 16 { - return Err("bad length"); - } - Ok(Self { - msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()), - rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), - }) - } -} diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index e550e98..c3a9390 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -15,7 +15,6 @@ use tokio::sync::watch; use tracing::{error, info, warn}; use crate::cryptoblob; -use crate::k2v_util::k2v_wait_value_changed; use crate::login::{Credentials, PublicCredentials}; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; @@ -23,7 +22,7 @@ use crate::mail::unique_ident::*; use crate::mail::user::User; use crate::mail::IMF; use crate::storage; -use crate::time::now_msec; +use crate::timestamp::now_msec; const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index e8111df..f27d50a 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -9,7 +9,7 @@ use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; use crate::storage::{RowStore, BlobStore, self}; -use crate::time::now_msec; +use crate::timestamp::now_msec; pub struct Mailbox { pub(super) id: UniqueIdent, @@ -227,7 +227,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", res.to_ref().sk()); + bail!("No valid meta value in k2v for {:?}", res.to_ref().key()); } } diff --git a/src/mail/unique_ident.rs b/src/mail/unique_ident.rs index 267f66e..0e629db 100644 --- a/src/mail/unique_ident.rs +++ b/src/mail/unique_ident.rs @@ -5,7 +5,7 @@ use lazy_static::lazy_static; use rand::prelude::*; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; -use crate::time::now_msec; +use crate::timestamp::now_msec; /// An internal Mail Identifier is composed of two components: /// - a process identifier, 128 bits, itself composed of: diff --git a/src/mail/user.rs b/src/mail/user.rs index 3b8d4e7..6d3bc1a 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -2,7 +2,6 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Weak}; use anyhow::{anyhow, bail, Result}; -use k2v_client::{CausalityToken, K2vClient, K2vValue}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use tokio::sync::watch; @@ -14,7 +13,7 @@ use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::{gen_ident, UniqueIdent}; use crate::storage; -use crate::time::now_msec; +use crate::timestamp::now_msec; pub const MAILBOX_HIERARCHY_DELIMITER: char = '.'; diff --git a/src/main.rs b/src/main.rs index 8d2a140..f395143 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ #![feature(async_fn_in_trait)] +mod timestamp; mod bayou; mod config; mod cryptoblob; @@ -10,7 +11,6 @@ mod login; mod mail; mod server; mod storage; -mod time; use std::path::PathBuf; diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 46da4aa..0abeb4d 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -24,6 +24,10 @@ impl IRowStore for GrgStore { fn row(&self, partition: &str, sort: &str) -> RowRef { unimplemented!(); } + + fn select(&self, selector: Selector) -> AsyncResult> { + unimplemented!(); + } } impl IRowRef for GrgRef { @@ -31,6 +35,10 @@ impl IRowRef for GrgRef { unimplemented!(); } + fn key(&self) -> (&str, &str) { + unimplemented!(); + } + fn set_value(&self, content: Vec) -> RowValue { unimplemented!(); } diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 144a52f..8db4eff 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -25,9 +25,17 @@ impl IRowStore for MemStore { fn row(&self, partition: &str, sort: &str) -> RowRef { unimplemented!(); } + + fn select(&self, selector: Selector) -> AsyncResult> { + unimplemented!(); + } } impl IRowRef for MemRef { + fn key(&self) -> (&str, &str) { + unimplemented!(); + } + fn clone_boxed(&self) -> RowRef { unimplemented!(); } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b687959..c3bf19f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -86,8 +86,7 @@ pub type RowStore = Box; pub trait IRowRef { fn clone_boxed(&self) -> RowRef; - fn pk(&self) -> &str; - fn sk(&self) -> &str; + fn key(&self) -> (&str, &str); fn set_value(&self, content: Vec) -> RowValue; fn fetch(&self) -> AsyncResult; fn rm(&self) -> AsyncResult<()>; diff --git a/src/time.rs b/src/time.rs deleted file mode 100644 index d34ee22..0000000 --- a/src/time.rs +++ /dev/null @@ -1,9 +0,0 @@ -use std::time::{SystemTime, UNIX_EPOCH}; - -/// Returns milliseconds since UNIX Epoch -pub fn now_msec() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Fix your clock :o") - .as_millis() as u64 -} diff --git a/src/timestamp.rs b/src/timestamp.rs new file mode 100644 index 0000000..76cb74b --- /dev/null +++ b/src/timestamp.rs @@ -0,0 +1,65 @@ +use rand::prelude::*; +use std::str::FromStr; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Returns milliseconds since UNIX Epoch +pub fn now_msec() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Fix your clock :o") + .as_millis() as u64 +} + +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] +pub struct Timestamp { + pub msec: u64, + pub rand: u64, +} + +impl Timestamp { + #[allow(dead_code)] + // 2023-05-15 try to make clippy happy and not sure if this fn will be used in the future. + pub fn now() -> Self { + let mut rng = thread_rng(); + Self { + msec: now_msec(), + rand: rng.gen::(), + } + } + + pub fn after(other: &Self) -> Self { + let mut rng = thread_rng(); + Self { + msec: std::cmp::max(now_msec(), other.msec + 1), + rand: rng.gen::(), + } + } + + pub fn zero() -> Self { + Self { msec: 0, rand: 0 } + } +} + +impl ToString for Timestamp { + fn to_string(&self) -> String { + let mut bytes = [0u8; 16]; + bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec)); + bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand)); + hex::encode(bytes) + } +} + +impl FromStr for Timestamp { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + let bytes = hex::decode(s).map_err(|_| "invalid hex")?; + if bytes.len() != 16 { + return Err("bad length"); + } + Ok(Self { + msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()), + rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), + }) + } +} -- cgit v1.2.3