diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-15 11:05:09 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-22 16:55:24 +0200 |
commit | 1b450c4b493dfcb2ee88acbca3ea584beac8eb4b (patch) | |
tree | d6437f105a630fa197b67446b5c3b2902335c34a /src/table/table.rs | |
parent | 4067797d0142ee7860aff8da95d65820d6cc0889 (diff) | |
download | garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.tar.gz garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.zip |
Improvements to CLI and various fixes for netapp version
Discovery via consul, persist peer list to file
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 35 |
1 files changed, 15 insertions, 20 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index ad263343..e1357471 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -34,7 +34,6 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { #[derive(Serialize, Deserialize)] pub(crate) enum TableRpc<F: TableSchema> { Ok, - Error(String), ReadEntry(F::P, F::S), ReadEntryResponse(Option<ByteBuf>), @@ -45,8 +44,8 @@ pub(crate) enum TableRpc<F: TableSchema> { Update(Vec<Arc<ByteBuf>>), } -impl<F: TableSchema> Message for TableRpc<F> { - type Response = TableRpc<F>; +impl<F: TableSchema> Rpc for TableRpc<F> { + type Response = Result<TableRpc<F>, Error>; } impl<F, R> Table<F, R> @@ -277,7 +276,7 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== - async fn repair_on_read(&self, who: &[NodeID], what: F::E) -> Result<(), Error> { + async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.system .rpc @@ -292,10 +291,19 @@ where .await?; Ok(()) } +} - // ====== RPC HANDLER ===== - // - async fn handle_rpc(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> { +#[async_trait] +impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle( + self: &Arc<Self>, + msg: &TableRpc<F>, + _from: NodeID, + ) -> Result<TableRpc<F>, Error> { match msg { TableRpc::ReadEntry(key, sort_key) => { let value = self.data.read_entry(key, sort_key)?; @@ -313,16 +321,3 @@ where } } } - -#[async_trait] -impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - async fn handle(self: &Arc<Self>, msg: &TableRpc<F>, _from: NodeID) -> TableRpc<F> { - self.handle_rpc(msg) - .await - .unwrap_or_else(|e| TableRpc::<F>::Error(format!("{}", e))) - } -} |