diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-09 18:43:53 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-09 18:43:53 +0200 |
commit | 1d786c2c663ac6f6e3e3ef52accd6e9eca049988 (patch) | |
tree | 813720a682fe7cf4947129020d73ccf4797d0c3a /src/table.rs | |
parent | 101444abb3967770ec378ee09f24eb2845dc091d (diff) | |
download | garage-1d786c2c663ac6f6e3e3ef52accd6e9eca049988.tar.gz garage-1d786c2c663ac6f6e3e3ef52accd6e9eca049988.zip |
Something works
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/src/table.rs b/src/table.rs index 55ae9229..6a72c8bc 100644 --- a/src/table.rs +++ b/src/table.rs @@ -44,7 +44,7 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> { async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> { let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?; let rep = self.table.handle(msg).await?; - Ok(rmp_serde::encode::to_vec_named(&rep)?) + Ok(rmp_to_vec_all_named(&rep)?) } } @@ -129,6 +129,7 @@ impl<F: TableFormat + 'static> Table<F> { let hash = e.partition_key().hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); + eprintln!("insert who: {:?}", who); let rpc = &TableRPC::<F>::Update(vec![e.clone()]); @@ -142,6 +143,7 @@ impl<F: TableFormat + 'static> Table<F> { let hash = partition_key.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); + eprintln!("get who: {:?}", who); let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self.rpc_try_call_many(&who[..], @@ -169,7 +171,9 @@ impl<F: TableFormat + 'static> Table<F> { } async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> { - let rpc_bytes = rmp_serde::encode::to_vec_named(rpc)?; + eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); + + let rpc_bytes = rmp_to_vec_all_named(rpc)?; let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); let resps = rpc_try_call_many(self.system.clone(), @@ -188,6 +192,7 @@ impl<F: TableFormat + 'static> Table<F> { } return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) } + eprintln!("Table RPC responses: {}", serde_json::to_string(&resps_vals)?); Ok(resps_vals) } @@ -228,7 +233,7 @@ impl<F: TableFormat + 'static> Table<F> { None => None }; - let new_bytes = rmp_serde::encode::to_vec_named(&entry)?; + let new_bytes = rmp_to_vec_all_named(&entry)?; self.store.insert(&tree_key, new_bytes)?; self.instance.updated(old_val.as_ref(), &entry).await; |