aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-12 15:07:23 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-12 15:07:23 +0100
commitcbe7e1a66a9dceaaeae0467b4eefe51afd5b297c (patch)
treee31874c3ac2a01e19e62226bdabcf74d0c57d1f3 /src/table/table.rs
parent8860aa19b867183b83ee48efd9990cd34e567f53 (diff)
downloadgarage-cbe7e1a66a9dceaaeae0467b4eefe51afd5b297c.tar.gz
garage-cbe7e1a66a9dceaaeae0467b4eefe51afd5b297c.zip
Move table rpc client out of tableaux
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs31
1 files changed, 13 insertions, 18 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index edb1be3f..dd3394bd 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -21,16 +21,16 @@ use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
-pub struct TableAux<F: TableSchema, R: TableReplication> {
+pub struct TableAux<R: TableReplication> {
pub system: Arc<System>,
pub replication: R,
- rpc_client: Arc<RpcClient<TableRPC<F>>>,
}
pub struct Table<F: TableSchema, R: TableReplication> {
pub data: Arc<TableData<F>>,
- pub aux: Arc<TableAux<F, R>>,
+ pub aux: Arc<TableAux<R>>,
pub syncer: Arc<TableSyncer<F, R>>,
+ rpc_client: Arc<RpcClient<TableRPC<F>>>,
}
#[derive(Serialize, Deserialize)]
@@ -73,12 +73,16 @@ where
let aux = Arc::new(TableAux {
system,
replication,
- rpc_client,
});
let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server);
- let table = Arc::new(Self { data, aux, syncer });
+ let table = Arc::new(Self {
+ data,
+ aux,
+ syncer,
+ rpc_client,
+ });
table.clone().register_handler(rpc_server, rpc_path);
@@ -93,8 +97,7 @@ where
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRPC::<F>::Update(vec![e_enc]);
- self.aux
- .rpc_client
+ self.rpc_client
.try_call_many(
&who[..],
rpc,
@@ -123,11 +126,7 @@ where
let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRPC::<F>::Update(entries);
- let resp = self
- .aux
- .rpc_client
- .call(node, rpc, TABLE_RPC_TIMEOUT)
- .await?;
+ let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
Ok::<_, Error>((node, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@@ -156,7 +155,6 @@ where
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
- .aux
.rpc_client
.try_call_many(
&who[..],
@@ -214,7 +212,6 @@ where
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
let resps = self
- .aux
.rpc_client
.try_call_many(
&who[..],
@@ -270,8 +267,7 @@ where
async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
- self.aux
- .rpc_client
+ self.rpc_client
.try_call_many(
&who[..],
TableRPC::<F>::Update(vec![what_enc]),
@@ -291,8 +287,7 @@ where
});
let self2 = self.clone();
- self.aux
- .rpc_client
+ self.rpc_client
.set_local_handler(self.aux.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle(&msg).await }