aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs24
1 files changed, 21 insertions, 3 deletions
diff --git a/src/table.rs b/src/table.rs
index 99ac77bb..162f98e6 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -306,7 +306,20 @@ impl<F: TableSchema + 'static> Table<F> {
Ok(resps_vals)
}
- async fn handle(&self, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
+ pub async fn rpc_call(&self, who: &UUID, rpc: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
+ let rpc_bytes = rmp_to_vec_all_named(rpc)?;
+ let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes);
+
+ let resp = rpc_call(self.system.clone(), who, &rpc_msg, self.param.timeout).await?;
+ if let Message::TableRPC(tbl, rep_by) = &resp {
+ if *tbl == self.name {
+ return Ok(rmp_serde::decode::from_read_ref(&rep_by)?);
+ }
+ }
+ Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp)))
+ }
+
+ async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
match msg {
TableRPC::ReadEntry(key, sort_key) => {
let value = self.handle_read_entry(&key, &sort_key)?;
@@ -334,7 +347,7 @@ impl<F: TableSchema + 'static> Table<F> {
}
}
- async fn handle_update(&self, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
+ async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
for update_bytes in entries.drain(..) {
let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
@@ -360,7 +373,12 @@ impl<F: TableSchema + 'static> Table<F> {
Ok((old_entry, new_entry))
})?;
- self.instance.updated(old_entry, new_entry).await;
+ if old_entry.as_ref() != Some(&new_entry) {
+ self.instance.updated(old_entry, new_entry).await;
+
+ let syncer = self.syncer.read().await.as_ref().unwrap().clone();
+ self.system.background.spawn(syncer.invalidate(tree_key));
+ }
}
Ok(())
}