aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bayou.rs94
-rw-r--r--src/mail/incoming.rs3
-rw-r--r--src/mail/mailbox.rs4
-rw-r--r--src/mail/unique_ident.rs2
-rw-r--r--src/mail/user.rs3
-rw-r--r--src/main.rs2
-rw-r--r--src/storage/garage.rs8
-rw-r--r--src/storage/in_memory.rs8
-rw-r--r--src/storage/mod.rs3
-rw-r--r--src/time.rs9
-rw-r--r--src/timestamp.rs65
11 files changed, 101 insertions, 100 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 9f70017..3201783 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -15,9 +15,9 @@ use rusoto_s3::{
};
use crate::cryptoblob::*;
-use crate::k2v_util::k2v_wait_value_changed;
use crate::login::Credentials;
-use crate::time::now_msec;
+use crate::timestamp::*;
+use crate::storage;
const KEEP_STATE_EVERY: usize = 64;
@@ -48,12 +48,11 @@ pub trait BayouState:
}
pub struct Bayou<S: BayouState> {
- bucket: String,
path: String,
key: Key,
- k2v: K2vClient,
- s3: S3Client,
+ k2v: storage::RowStore,
+ s3: storage::BlobStore,
checkpoint: (Timestamp, S),
history: Vec<(Timestamp, S::Op, Option<S>)>,
@@ -67,13 +66,12 @@ pub struct Bayou<S: BayouState> {
impl<S: BayouState> Bayou<S> {
pub fn new(creds: &Credentials, path: String) -> Result<Self> {
- let k2v_client = creds.k2v_client()?;
- let s3_client = creds.s3_client()?;
+ let k2v_client = creds.row_client()?;
+ let s3_client = creds.blob_client()?;
let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?;
Ok(Self {
- bucket: creds.bucket().to_string(),
path,
key: creds.keys.master.clone(),
k2v: k2v_client,
@@ -103,17 +101,8 @@ impl<S: BayouState> Bayou<S> {
} else {
debug!("(sync) loading checkpoint: {}", key);
- let gor = GetObjectRequest {
- bucket: self.bucket.clone(),
- key: key.to_string(),
- ..Default::default()
- };
-
- let obj_res = self.s3.get_object(gor).await?;
-
- let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
- let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
- obj_body.into_async_read().read_to_end(&mut buf).await?;
+ let obj_res = self.s3.blob(key).fetch().await?;
+ let buf = obj_res.content().ok_or(anyhow!("object can't be empty"))?;
debug!("(sync) checkpoint body length: {}", buf.len());
@@ -145,7 +134,8 @@ impl<S: BayouState> Bayou<S> {
// 3. List all operations starting from checkpoint
let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser);
- let ops_map = self
+ let ops_map = self.k2v.select(storage::Selector::Range { begin: &ts_ser, end: WATCH_SK }).await?;
+ /*let ops_map = self
.k2v
.read_batch(&[BatchReadOp {
partition_key: &self.path,
@@ -164,13 +154,11 @@ impl<S: BayouState> Bayou<S> {
.into_iter()
.next()
.ok_or(anyhow!("Missing K2V result"))?
- .items;
+ .items;*/
let mut ops = vec![];
- for (tsstr, val) in ops_map {
- let ts = tsstr
- .parse::<Timestamp>()
- .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?;
+ for row_value in ops_map {
+ let ts = row_value.timestamp();
if val.value.len() != 1 {
bail!("Invalid operation, has {} values", val.value.len());
}
@@ -536,59 +524,3 @@ impl K2vWatch {
info!("bayou k2v watch bg loop exiting");
}
}
-
-// ---- TIMESTAMP CLASS ----
-
-#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
-pub struct Timestamp {
- pub msec: u64,
- pub rand: u64,
-}
-
-impl Timestamp {
- #[allow(dead_code)]
- // 2023-05-15 try to make clippy happy and not sure if this fn will be used in the future.
- pub fn now() -> Self {
- let mut rng = thread_rng();
- Self {
- msec: now_msec(),
- rand: rng.gen::<u64>(),
- }
- }
-
- pub fn after(other: &Self) -> Self {
- let mut rng = thread_rng();
- Self {
- msec: std::cmp::max(now_msec(), other.msec + 1),
- rand: rng.gen::<u64>(),
- }
- }
-
- pub fn zero() -> Self {
- Self { msec: 0, rand: 0 }
- }
-}
-
-impl ToString for Timestamp {
- fn to_string(&self) -> String {
- let mut bytes = [0u8; 16];
- bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec));
- bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand));
- hex::encode(bytes)
- }
-}
-
-impl FromStr for Timestamp {
- type Err = &'static str;
-
- fn from_str(s: &str) -> Result<Timestamp, &'static str> {
- let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
- if bytes.len() != 16 {
- return Err("bad length");
- }
- Ok(Self {
- msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()),
- rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
- })
- }
-}
diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs
index e550e98..c3a9390 100644
--- a/src/mail/incoming.rs
+++ b/src/mail/incoming.rs
@@ -15,7 +15,6 @@ use tokio::sync::watch;
use tracing::{error, info, warn};
use crate::cryptoblob;
-use crate::k2v_util::k2v_wait_value_changed;
use crate::login::{Credentials, PublicCredentials};
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
@@ -23,7 +22,7 @@ use crate::mail::unique_ident::*;
use crate::mail::user::User;
use crate::mail::IMF;
use crate::storage;
-use crate::time::now_msec;
+use crate::timestamp::now_msec;
const INCOMING_PK: &str = "incoming";
const INCOMING_LOCK_SK: &str = "lock";
diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs
index e8111df..f27d50a 100644
--- a/src/mail/mailbox.rs
+++ b/src/mail/mailbox.rs
@@ -9,7 +9,7 @@ use crate::mail::uidindex::*;
use crate::mail::unique_ident::*;
use crate::mail::IMF;
use crate::storage::{RowStore, BlobStore, self};
-use crate::time::now_msec;
+use crate::timestamp::now_msec;
pub struct Mailbox {
pub(super) id: UniqueIdent,
@@ -227,7 +227,7 @@ impl MailboxInternal {
if let Some(meta) = meta_opt {
meta_vec.push(meta);
} else {
- bail!("No valid meta value in k2v for {:?}", res.to_ref().sk());
+ bail!("No valid meta value in k2v for {:?}", res.to_ref().key());
}
}
diff --git a/src/mail/unique_ident.rs b/src/mail/unique_ident.rs
index 267f66e..0e629db 100644
--- a/src/mail/unique_ident.rs
+++ b/src/mail/unique_ident.rs
@@ -5,7 +5,7 @@ use lazy_static::lazy_static;
use rand::prelude::*;
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
-use crate::time::now_msec;
+use crate::timestamp::now_msec;
/// An internal Mail Identifier is composed of two components:
/// - a process identifier, 128 bits, itself composed of:
diff --git a/src/mail/user.rs b/src/mail/user.rs
index 3b8d4e7..6d3bc1a 100644
--- a/src/mail/user.rs
+++ b/src/mail/user.rs
@@ -2,7 +2,6 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Weak};
use anyhow::{anyhow, bail, Result};
-use k2v_client::{CausalityToken, K2vClient, K2vValue};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
@@ -14,7 +13,7 @@ use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
use crate::mail::unique_ident::{gen_ident, UniqueIdent};
use crate::storage;
-use crate::time::now_msec;
+use crate::timestamp::now_msec;
pub const MAILBOX_HIERARCHY_DELIMITER: char = '.';
diff --git a/src/main.rs b/src/main.rs
index 8d2a140..f395143 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,6 @@
#![feature(async_fn_in_trait)]
+mod timestamp;
mod bayou;
mod config;
mod cryptoblob;
@@ -10,7 +11,6 @@ mod login;
mod mail;
mod server;
mod storage;
-mod time;
use std::path::PathBuf;
diff --git a/src/storage/garage.rs b/src/storage/garage.rs
index 46da4aa..0abeb4d 100644
--- a/src/storage/garage.rs
+++ b/src/storage/garage.rs
@@ -24,6 +24,10 @@ impl IRowStore for GrgStore {
fn row(&self, partition: &str, sort: &str) -> RowRef {
unimplemented!();
}
+
+ fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>> {
+ unimplemented!();
+ }
}
impl IRowRef for GrgRef {
@@ -31,6 +35,10 @@ impl IRowRef for GrgRef {
unimplemented!();
}
+ fn key(&self) -> (&str, &str) {
+ unimplemented!();
+ }
+
fn set_value(&self, content: Vec<u8>) -> RowValue {
unimplemented!();
}
diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs
index 144a52f..8db4eff 100644
--- a/src/storage/in_memory.rs
+++ b/src/storage/in_memory.rs
@@ -25,9 +25,17 @@ impl IRowStore for MemStore {
fn row(&self, partition: &str, sort: &str) -> RowRef {
unimplemented!();
}
+
+ fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>> {
+ unimplemented!();
+ }
}
impl IRowRef for MemRef {
+ fn key(&self) -> (&str, &str) {
+ unimplemented!();
+ }
+
fn clone_boxed(&self) -> RowRef {
unimplemented!();
}
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index b687959..c3bf19f 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -86,8 +86,7 @@ pub type RowStore = Box<dyn IRowStore + Sync + Send>;
pub trait IRowRef
{
fn clone_boxed(&self) -> RowRef;
- fn pk(&self) -> &str;
- fn sk(&self) -> &str;
+ fn key(&self) -> (&str, &str);
fn set_value(&self, content: Vec<u8>) -> RowValue;
fn fetch(&self) -> AsyncResult<RowValue>;
fn rm(&self) -> AsyncResult<()>;
diff --git a/src/time.rs b/src/time.rs
deleted file mode 100644
index d34ee22..0000000
--- a/src/time.rs
+++ /dev/null
@@ -1,9 +0,0 @@
-use std::time::{SystemTime, UNIX_EPOCH};
-
-/// Returns milliseconds since UNIX Epoch
-pub fn now_msec() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("Fix your clock :o")
- .as_millis() as u64
-}
diff --git a/src/timestamp.rs b/src/timestamp.rs
new file mode 100644
index 0000000..76cb74b
--- /dev/null
+++ b/src/timestamp.rs
@@ -0,0 +1,65 @@
+use rand::prelude::*;
+use std::str::FromStr;
+use std::time::{SystemTime, UNIX_EPOCH};
+
+/// Returns milliseconds since UNIX Epoch
+pub fn now_msec() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Fix your clock :o")
+ .as_millis() as u64
+}
+
+#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
+pub struct Timestamp {
+ pub msec: u64,
+ pub rand: u64,
+}
+
+impl Timestamp {
+ #[allow(dead_code)]
+ // 2023-05-15 try to make clippy happy and not sure if this fn will be used in the future.
+ pub fn now() -> Self {
+ let mut rng = thread_rng();
+ Self {
+ msec: now_msec(),
+ rand: rng.gen::<u64>(),
+ }
+ }
+
+ pub fn after(other: &Self) -> Self {
+ let mut rng = thread_rng();
+ Self {
+ msec: std::cmp::max(now_msec(), other.msec + 1),
+ rand: rng.gen::<u64>(),
+ }
+ }
+
+ pub fn zero() -> Self {
+ Self { msec: 0, rand: 0 }
+ }
+}
+
+impl ToString for Timestamp {
+ fn to_string(&self) -> String {
+ let mut bytes = [0u8; 16];
+ bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec));
+ bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand));
+ hex::encode(bytes)
+ }
+}
+
+impl FromStr for Timestamp {
+ type Err = &'static str;
+
+ fn from_str(s: &str) -> Result<Timestamp, &'static str> {
+ let bytes = hex::decode(s).map_err(|_| "invalid hex")?;
+ if bytes.len() != 16 {
+ return Err("bad length");
+ }
+ Ok(Self {
+ msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()),
+ rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
+ })
+ }
+}