diff options
author | Alex Auvolat <alex@adnab.me> | 2022-05-18 12:24:37 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-05-18 12:26:50 +0200 |
commit | 7a3ce9f81963cc374271272bfe4e0e204e9b7012 (patch) | |
tree | 369a1c390d5aeb5f3ce2515b677affca366cc328 /src/bayou.rs | |
download | aerogramme-7a3ce9f81963cc374271272bfe4e0e204e9b7012.tar.gz aerogramme-7a3ce9f81963cc374271272bfe4e0e204e9b7012.zip |
Skeleton for some stuff
Diffstat (limited to 'src/bayou.rs')
-rw-r--r-- | src/bayou.rs | 123 |
1 files changed, 123 insertions, 0 deletions
diff --git a/src/bayou.rs b/src/bayou.rs new file mode 100644 index 0000000..8fc711e --- /dev/null +++ b/src/bayou.rs @@ -0,0 +1,123 @@ +use std::time::Duration; + +use anyhow::Result; +use rand::prelude::*; +use serde::{Deserialize, Serialize}; + +use k2v_client::K2vClient; +use rusoto_core::HttpClient; +use rusoto_credential::{AwsCredentials, StaticProvider}; +use rusoto_s3::S3Client; +use rusoto_signature::Region; + +use crate::cryptoblob::Key; +use crate::time::now_msec; + +pub trait BayouState: + Default + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static +{ + type Op: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; + + fn apply(&self, op: &Self::Op) -> Self; +} + +pub struct Bayou<S: BayouState> { + bucket: String, + path: String, + key: Key, + + k2v: K2vClient, + s3: S3Client, + + checkpoint: (Timestamp, S), + history: Vec<(Timestamp, S::Op, Option<S>)>, +} + +impl<S: BayouState> Bayou<S> { + pub fn new( + creds: AwsCredentials, + k2v_region: Region, + s3_region: Region, + bucket: String, + path: String, + key: Key, + ) -> Result<Self> { + let k2v_client = K2vClient::new(k2v_region, bucket.clone(), creds.clone(), None)?; + let static_creds = StaticProvider::new( + creds.aws_access_key_id().to_string(), + creds.aws_secret_access_key().to_string(), + creds.token().clone(), + None, + ); + let s3_client = S3Client::new_with(HttpClient::new()?, static_creds, s3_region); + + Ok(Self { + bucket, + path, + key, + k2v: k2v_client, + s3: s3_client, + checkpoint: (Timestamp::zero(), S::default()), + history: vec![], + }) + } + + pub async fn sync(&mut self) -> Result<()> { + // 1. List checkpoints + // 2. Load last checkpoint + // 3. List all operations starting from checkpoint + // 4. Check that first operation has same timestamp as checkpoint (if not zero) + // 5. Apply all operations in order + unimplemented!() + } + + pub fn state(&self) -> &S { + unimplemented!() + } +} + +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct Timestamp { + pub msec: u64, + pub rand: u64, +} + +impl Timestamp { + pub fn now() -> Self { + let mut rng = thread_rng(); + Self { + msec: now_msec(), + rand: rng.gen::<u64>(), + } + } + + pub fn after(other: &Self) -> Self { + let mut rng = thread_rng(); + Self { + msec: std::cmp::max(now_msec(), other.msec + 1), + rand: rng.gen::<u64>(), + } + } + + pub fn zero() -> Self { + Self { msec: 0, rand: 0 } + } + + pub fn serialize(&self) -> String { + let mut bytes = [0u8; 16]; + bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec)); + bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand)); + hex::encode(&bytes) + } + + pub fn parse(v: &str) -> Option<Self> { + let bytes = hex::decode(v).ok()?; + if bytes.len() != 16 { + return None; + } + Some(Self { + msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()), + rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), + }) + } +} |