aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs11
1 files changed, 8 insertions, 3 deletions
diff --git a/src/table.rs b/src/table.rs
index 6b7d1779..6892c9f5 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -4,9 +4,9 @@ use std::time::Duration;
use async_trait::async_trait;
use futures::stream::*;
-use tokio::sync::RwLock;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
+use tokio::sync::RwLock;
use crate::data::*;
use crate::error::Error;
@@ -316,7 +316,10 @@ impl<F: TableSchema + 'static> Table<F> {
return Ok(rmp_serde::decode::from_read_ref(&rep_by)?);
}
}
- Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp)))
+ Err(Error::Message(format!(
+ "Invalid reply to TableRPC: {:?}",
+ resp
+ )))
}
async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
@@ -331,7 +334,9 @@ impl<F: TableSchema + 'static> Table<F> {
}
TableRPC::SyncChecksums(checksums) => {
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
- let differing = syncer.handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()).await?;
+ let differing = syncer
+ .handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone())
+ .await?;
Ok(TableRPC::SyncDifferentSet(differing))
}
_ => Err(Error::RPCError(format!("Unexpected table RPC"))),