diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-12 15:07:23 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-12 15:07:23 +0100 |
commit | cbe7e1a66a9dceaaeae0467b4eefe51afd5b297c (patch) | |
tree | e31874c3ac2a01e19e62226bdabcf74d0c57d1f3 /src/table | |
parent | 8860aa19b867183b83ee48efd9990cd34e567f53 (diff) | |
download | garage-cbe7e1a66a9dceaaeae0467b4eefe51afd5b297c.tar.gz garage-cbe7e1a66a9dceaaeae0467b4eefe51afd5b297c.zip |
Move table rpc client out of tableaux
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/sync.rs | 6 | ||||
-rw-r--r-- | src/table/table.rs | 31 |
2 files changed, 16 insertions, 21 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index 23161d15..4be8cd10 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -31,7 +31,7 @@ const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); pub struct TableSyncer<F: TableSchema, R: TableReplication> { data: Arc<TableData<F>>, - aux: Arc<TableAux<F, R>>, + aux: Arc<TableAux<R>>, todo: Mutex<SyncTodo>, rpc_client: Arc<RpcClient<SyncRPC>>, @@ -78,7 +78,7 @@ where { pub(crate) fn launch( data: Arc<TableData<F>>, - aux: Arc<TableAux<F, R>>, + aux: Arc<TableAux<R>>, rpc_server: &mut RpcServer, ) -> Arc<Self> { let rpc_path = format!("table_{}/sync", data.name); @@ -605,7 +605,7 @@ impl SyncTodo { fn add_full_sync<F: TableSchema, R: TableReplication>( &mut self, data: &TableData<F>, - aux: &TableAux<F, R>, + aux: &TableAux<R>, ) { let my_id = aux.system.id; diff --git a/src/table/table.rs b/src/table/table.rs index edb1be3f..dd3394bd 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -21,16 +21,16 @@ use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct TableAux<F: TableSchema, R: TableReplication> { +pub struct TableAux<R: TableReplication> { pub system: Arc<System>, pub replication: R, - rpc_client: Arc<RpcClient<TableRPC<F>>>, } pub struct Table<F: TableSchema, R: TableReplication> { pub data: Arc<TableData<F>>, - pub aux: Arc<TableAux<F, R>>, + pub aux: Arc<TableAux<R>>, pub syncer: Arc<TableSyncer<F, R>>, + rpc_client: Arc<RpcClient<TableRPC<F>>>, } #[derive(Serialize, Deserialize)] @@ -73,12 +73,16 @@ where let aux = Arc::new(TableAux { system, replication, - rpc_client, }); let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); - let table = Arc::new(Self { data, aux, syncer }); + let table = Arc::new(Self { + data, + aux, + syncer, + rpc_client, + }); table.clone().register_handler(rpc_server, rpc_path); @@ -93,8 +97,7 @@ where let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::<F>::Update(vec![e_enc]); - self.aux - .rpc_client + self.rpc_client .try_call_many( &who[..], rpc, @@ -123,11 +126,7 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::<F>::Update(entries); - let resp = self - .aux - .rpc_client - .call(node, rpc, TABLE_RPC_TIMEOUT) - .await?; + let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); @@ -156,7 +155,6 @@ where let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self - .aux .rpc_client .try_call_many( &who[..], @@ -214,7 +212,6 @@ where let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .aux .rpc_client .try_call_many( &who[..], @@ -270,8 +267,7 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.aux - .rpc_client + self.rpc_client .try_call_many( &who[..], TableRPC::<F>::Update(vec![what_enc]), @@ -291,8 +287,7 @@ where }); let self2 = self.clone(); - self.aux - .rpc_client + self.rpc_client .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } |