diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 24 |
1 files changed, 21 insertions, 3 deletions
diff --git a/src/table.rs b/src/table.rs index 99ac77bb..162f98e6 100644 --- a/src/table.rs +++ b/src/table.rs @@ -306,7 +306,20 @@ impl<F: TableSchema + 'static> Table<F> { Ok(resps_vals) } - async fn handle(&self, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { + pub async fn rpc_call(&self, who: &UUID, rpc: &TableRPC<F>) -> Result<TableRPC<F>, Error> { + let rpc_bytes = rmp_to_vec_all_named(rpc)?; + let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); + + let resp = rpc_call(self.system.clone(), who, &rpc_msg, self.param.timeout).await?; + if let Message::TableRPC(tbl, rep_by) = &resp { + if *tbl == self.name { + return Ok(rmp_serde::decode::from_read_ref(&rep_by)?); + } + } + Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) + } + + async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { let value = self.handle_read_entry(&key, &sort_key)?; @@ -334,7 +347,7 @@ impl<F: TableSchema + 'static> Table<F> { } } - async fn handle_update(&self, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> { + async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> { for update_bytes in entries.drain(..) { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; @@ -360,7 +373,12 @@ impl<F: TableSchema + 'static> Table<F> { Ok((old_entry, new_entry)) })?; - self.instance.updated(old_entry, new_entry).await; + if old_entry.as_ref() != Some(&new_entry) { + self.instance.updated(old_entry, new_entry).await; + + let syncer = self.syncer.read().await.as_ref().unwrap().clone(); + self.system.background.spawn(syncer.invalidate(tree_key)); + } } Ok(()) } |