diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-27 14:58:28 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2023-12-27 14:58:28 +0100 |
commit | 7ac24ad913fa081e1bd6f5b042b9da0173dad267 (patch) | |
tree | e8cb9d877ea0df530f6699d7d96dced66ff079d6 /src/bayou.rs | |
parent | 54c9736a247bb3534a285caa637c9afb052bc2dd (diff) | |
download | aerogramme-7ac24ad913fa081e1bd6f5b042b9da0173dad267.tar.gz aerogramme-7ac24ad913fa081e1bd6f5b042b9da0173dad267.zip |
cargo format
Diffstat (limited to 'src/bayou.rs')
-rw-r--r-- | src/bayou.rs | 54 |
1 files changed, 31 insertions, 23 deletions
diff --git a/src/bayou.rs b/src/bayou.rs index 1361e49..7253a30 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -9,9 +9,8 @@ use tokio::sync::{watch, Notify}; use crate::cryptoblob::*; use crate::login::Credentials; -use crate::timestamp::*; use crate::storage; - +use crate::timestamp::*; const KEEP_STATE_EVERY: usize = 64; @@ -94,7 +93,11 @@ impl<S: BayouState> Bayou<S> { } else { debug!("(sync) loading checkpoint: {}", key); - let buf = self.storage.blob_fetch(&storage::BlobRef(key.to_string())).await?.value; + let buf = self + .storage + .blob_fetch(&storage::BlobRef(key.to_string())) + .await? + .value; debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::<S>(&buf, &self.key)?; @@ -125,17 +128,22 @@ impl<S: BayouState> Bayou<S> { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); - let ops_map = self.storage.row_fetch(&storage::Selector::Range { - shard: &self.path, - sort_begin: &ts_ser, - sort_end: WATCH_SK - }).await?; + let ops_map = self + .storage + .row_fetch(&storage::Selector::Range { + shard: &self.path, + sort_begin: &ts_ser, + sort_end: WATCH_SK, + }) + .await?; let mut ops = vec![]; for row_value in ops_map { let row = row_value.row_ref; let sort_key = row.uid.sort; - let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?; + let ts = sort_key + .parse::<Timestamp>() + .map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?; let val = row_value.value; if val.len() != 1 { @@ -211,7 +219,7 @@ impl<S: BayouState> Bayou<S> { // Save info that sync has been done self.last_sync = new_last_sync; - self.last_sync_watch_ct = new_last_sync_watch_ct; + self.last_sync_watch_ct = new_last_sync_watch_ct; Ok(()) } @@ -362,16 +370,20 @@ impl<S: BayouState> Bayou<S> { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { debug!("(cp) drop old checkpoint {}", key); - self.storage.blob_rm(&storage::BlobRef(key.to_string())).await?; + self.storage + .blob_rm(&storage::BlobRef(key.to_string())) + .await?; } // Delete corresponding range of operations let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); - self.storage.row_rm(&storage::Selector::Range { - shard: &self.path, - sort_begin: "", - sort_end: &ts_ser - }).await? + self.storage + .row_rm(&storage::Selector::Range { + shard: &self.path, + sort_begin: "", + sort_end: &ts_ser, + }) + .await? } Ok(()) @@ -426,11 +438,7 @@ impl K2vWatch { let watch = Arc::new(K2vWatch { target, rx, notify }); - tokio::spawn(Self::background_task( - Arc::downgrade(&watch), - storage, - tx, - )); + tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx)); Ok(watch) } @@ -444,8 +452,8 @@ impl K2vWatch { Some(this) => this.target.clone(), None => { error!("can't start loop"); - return - }, + return; + } }; while let Some(this) = Weak::upgrade(&self_weak) { |