aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs11
1 files changed, 8 insertions, 3 deletions
diff --git a/src/table.rs b/src/table.rs
index 55ae9229..6a72c8bc 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -44,7 +44,7 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> {
async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> {
let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?;
let rep = self.table.handle(msg).await?;
- Ok(rmp_serde::encode::to_vec_named(&rep)?)
+ Ok(rmp_to_vec_all_named(&rep)?)
}
}
@@ -129,6 +129,7 @@ impl<F: TableFormat + 'static> Table<F> {
let hash = e.partition_key().hash();
let who = self.system.members.read().await
.walk_ring(&hash, self.param.replication_factor);
+ eprintln!("insert who: {:?}", who);
let rpc = &TableRPC::<F>::Update(vec![e.clone()]);
@@ -142,6 +143,7 @@ impl<F: TableFormat + 'static> Table<F> {
let hash = partition_key.hash();
let who = self.system.members.read().await
.walk_ring(&hash, self.param.replication_factor);
+ eprintln!("get who: {:?}", who);
let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self.rpc_try_call_many(&who[..],
@@ -169,7 +171,9 @@ impl<F: TableFormat + 'static> Table<F> {
}
async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> {
- let rpc_bytes = rmp_serde::encode::to_vec_named(rpc)?;
+ eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?);
+
+ let rpc_bytes = rmp_to_vec_all_named(rpc)?;
let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes);
let resps = rpc_try_call_many(self.system.clone(),
@@ -188,6 +192,7 @@ impl<F: TableFormat + 'static> Table<F> {
}
return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp)))
}
+ eprintln!("Table RPC responses: {}", serde_json::to_string(&resps_vals)?);
Ok(resps_vals)
}
@@ -228,7 +233,7 @@ impl<F: TableFormat + 'static> Table<F> {
None => None
};
- let new_bytes = rmp_serde::encode::to_vec_named(&entry)?;
+ let new_bytes = rmp_to_vec_all_named(&entry)?;
self.store.insert(&tree_key, new_bytes)?;
self.instance.updated(old_val.as_ref(), &entry).await;