diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 50 |
1 files changed, 22 insertions, 28 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index a4cb4b24..516c9358 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -15,9 +15,9 @@ use garage_rpc::rpc_server::*; use crate::crdt::CRDT; use crate::data::*; -use crate::schema::*; -use crate::table_sync::*; use crate::replication::*; +use crate::schema::*; +use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); @@ -50,7 +50,6 @@ pub(crate) enum TableRPC<F: TableSchema> { impl<F: TableSchema> RpcMessage for TableRPC<F> {} - impl<F, R> Table<F, R> where F: TableSchema + 'static, @@ -69,29 +68,17 @@ where let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); - let data = TableData::new( - name, - instance, - db, - system.background.clone(), - ); + let data = TableData::new(name, instance, db, system.background.clone()); - let aux = Arc::new(TableAux{ + let aux = Arc::new(TableAux { system, replication, rpc_client, }); - let syncer = TableSyncer::launch( - data.clone(), - aux.clone(), - ); + let syncer = TableSyncer::launch(data.clone(), aux.clone()); - let table = Arc::new(Self { - data, - aux, - syncer, - }); + let table = Arc::new(Self { data, aux, syncer }); table.clone().register_handler(rpc_server, rpc_path); @@ -106,7 +93,8 @@ where let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::<F>::Update(vec![e_enc]); - self.aux.rpc_client + self.aux + .rpc_client .try_call_many( &who[..], rpc, @@ -135,7 +123,11 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::<F>::Update(entries); - let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self + .aux + .rpc_client + .call(node, rpc, TABLE_RPC_TIMEOUT) + .await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); @@ -200,7 +192,8 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.aux.system + self.aux + .system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -221,7 +214,8 @@ where let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .aux.rpc_client + .aux + .rpc_client .try_call_many( &who[..], rpc, @@ -276,7 +270,8 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.aux.rpc_client + self.aux + .rpc_client .try_call_many( &who[..], TableRPC::<F>::Update(vec![what_enc]), @@ -296,7 +291,8 @@ where }); let self2 = self.clone(); - self.aux.rpc_client + self.aux + .rpc_client .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } @@ -318,9 +314,7 @@ where Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { - let response = self.syncer - .handle_rpc(rpc, self.aux.system.background.stop_signal.clone()) - .await?; + let response = self.syncer.handle_rpc(rpc).await?; Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), |