aboutsummaryrefslogtreecommitdiff
path: root/src/bayou.rs
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2023-12-27 14:58:28 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2023-12-27 14:58:28 +0100
commit7ac24ad913fa081e1bd6f5b042b9da0173dad267 (patch)
treee8cb9d877ea0df530f6699d7d96dced66ff079d6 /src/bayou.rs
parent54c9736a247bb3534a285caa637c9afb052bc2dd (diff)
downloadaerogramme-7ac24ad913fa081e1bd6f5b042b9da0173dad267.tar.gz
aerogramme-7ac24ad913fa081e1bd6f5b042b9da0173dad267.zip
cargo format
Diffstat (limited to 'src/bayou.rs')
-rw-r--r--src/bayou.rs54
1 files changed, 31 insertions, 23 deletions
diff --git a/src/bayou.rs b/src/bayou.rs
index 1361e49..7253a30 100644
--- a/src/bayou.rs
+++ b/src/bayou.rs
@@ -9,9 +9,8 @@ use tokio::sync::{watch, Notify};
use crate::cryptoblob::*;
use crate::login::Credentials;
-use crate::timestamp::*;
use crate::storage;
-
+use crate::timestamp::*;
const KEEP_STATE_EVERY: usize = 64;
@@ -94,7 +93,11 @@ impl<S: BayouState> Bayou<S> {
} else {
debug!("(sync) loading checkpoint: {}", key);
- let buf = self.storage.blob_fetch(&storage::BlobRef(key.to_string())).await?.value;
+ let buf = self
+ .storage
+ .blob_fetch(&storage::BlobRef(key.to_string()))
+ .await?
+ .value;
debug!("(sync) checkpoint body length: {}", buf.len());
let ck = open_deserialize::<S>(&buf, &self.key)?;
@@ -125,17 +128,22 @@ impl<S: BayouState> Bayou<S> {
// 3. List all operations starting from checkpoint
let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser);
- let ops_map = self.storage.row_fetch(&storage::Selector::Range {
- shard: &self.path,
- sort_begin: &ts_ser,
- sort_end: WATCH_SK
- }).await?;
+ let ops_map = self
+ .storage
+ .row_fetch(&storage::Selector::Range {
+ shard: &self.path,
+ sort_begin: &ts_ser,
+ sort_end: WATCH_SK,
+ })
+ .await?;
let mut ops = vec![];
for row_value in ops_map {
let row = row_value.row_ref;
let sort_key = row.uid.sort;
- let ts = sort_key.parse::<Timestamp>().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
+ let ts = sort_key
+ .parse::<Timestamp>()
+ .map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?;
let val = row_value.value;
if val.len() != 1 {
@@ -211,7 +219,7 @@ impl<S: BayouState> Bayou<S> {
// Save info that sync has been done
self.last_sync = new_last_sync;
- self.last_sync_watch_ct = new_last_sync_watch_ct;
+ self.last_sync_watch_ct = new_last_sync_watch_ct;
Ok(())
}
@@ -362,16 +370,20 @@ impl<S: BayouState> Bayou<S> {
// Delete blobs
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
debug!("(cp) drop old checkpoint {}", key);
- self.storage.blob_rm(&storage::BlobRef(key.to_string())).await?;
+ self.storage
+ .blob_rm(&storage::BlobRef(key.to_string()))
+ .await?;
}
// Delete corresponding range of operations
let ts_ser = existing_checkpoints[last_to_keep].0.to_string();
- self.storage.row_rm(&storage::Selector::Range {
- shard: &self.path,
- sort_begin: "",
- sort_end: &ts_ser
- }).await?
+ self.storage
+ .row_rm(&storage::Selector::Range {
+ shard: &self.path,
+ sort_begin: "",
+ sort_end: &ts_ser,
+ })
+ .await?
}
Ok(())
@@ -426,11 +438,7 @@ impl K2vWatch {
let watch = Arc::new(K2vWatch { target, rx, notify });
- tokio::spawn(Self::background_task(
- Arc::downgrade(&watch),
- storage,
- tx,
- ));
+ tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx));
Ok(watch)
}
@@ -444,8 +452,8 @@ impl K2vWatch {
Some(this) => this.target.clone(),
None => {
error!("can't start loop");
- return
- },
+ return;
+ }
};
while let Some(this) = Weak::upgrade(&self_weak) {