aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs35
1 files changed, 15 insertions, 20 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index ad263343..e1357471 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -34,7 +34,6 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
#[derive(Serialize, Deserialize)]
pub(crate) enum TableRpc<F: TableSchema> {
Ok,
- Error(String),
ReadEntry(F::P, F::S),
ReadEntryResponse(Option<ByteBuf>),
@@ -45,8 +44,8 @@ pub(crate) enum TableRpc<F: TableSchema> {
Update(Vec<Arc<ByteBuf>>),
}
-impl<F: TableSchema> Message for TableRpc<F> {
- type Response = TableRpc<F>;
+impl<F: TableSchema> Rpc for TableRpc<F> {
+ type Response = Result<TableRpc<F>, Error>;
}
impl<F, R> Table<F, R>
@@ -277,7 +276,7 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
- async fn repair_on_read(&self, who: &[NodeID], what: F::E) -> Result<(), Error> {
+ async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
self.system
.rpc
@@ -292,10 +291,19 @@ where
.await?;
Ok(())
}
+}
- // ====== RPC HANDLER =====
- //
- async fn handle_rpc(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> {
+#[async_trait]
+impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(
+ self: &Arc<Self>,
+ msg: &TableRpc<F>,
+ _from: NodeID,
+ ) -> Result<TableRpc<F>, Error> {
match msg {
TableRpc::ReadEntry(key, sort_key) => {
let value = self.data.read_entry(key, sort_key)?;
@@ -313,16 +321,3 @@ where
}
}
}
-
-#[async_trait]
-impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- async fn handle(self: &Arc<Self>, msg: &TableRpc<F>, _from: NodeID) -> TableRpc<F> {
- self.handle_rpc(msg)
- .await
- .unwrap_or_else(|e| TableRpc::<F>::Error(format!("{}", e)))
- }
-}