aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs50
1 files changed, 22 insertions, 28 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index a4cb4b24..516c9358 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -15,9 +15,9 @@ use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
use crate::data::*;
-use crate::schema::*;
-use crate::table_sync::*;
use crate::replication::*;
+use crate::schema::*;
+use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
@@ -50,7 +50,6 @@ pub(crate) enum TableRPC<F: TableSchema> {
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
-
impl<F, R> Table<F, R>
where
F: TableSchema + 'static,
@@ -69,29 +68,17 @@ where
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
- let data = TableData::new(
- name,
- instance,
- db,
- system.background.clone(),
- );
+ let data = TableData::new(name, instance, db, system.background.clone());
- let aux = Arc::new(TableAux{
+ let aux = Arc::new(TableAux {
system,
replication,
rpc_client,
});
- let syncer = TableSyncer::launch(
- data.clone(),
- aux.clone(),
- );
+ let syncer = TableSyncer::launch(data.clone(), aux.clone());
- let table = Arc::new(Self {
- data,
- aux,
- syncer,
- });
+ let table = Arc::new(Self { data, aux, syncer });
table.clone().register_handler(rpc_server, rpc_path);
@@ -106,7 +93,8 @@ where
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRPC::<F>::Update(vec![e_enc]);
- self.aux.rpc_client
+ self.aux
+ .rpc_client
.try_call_many(
&who[..],
rpc,
@@ -135,7 +123,11 @@ where
let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRPC::<F>::Update(entries);
- let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
+ let resp = self
+ .aux
+ .rpc_client
+ .call(node, rpc, TABLE_RPC_TIMEOUT)
+ .await?;
Ok::<_, Error>((node, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@@ -200,7 +192,8 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.aux.system
+ self.aux
+ .system
.background
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
@@ -221,7 +214,8 @@ where
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
let resps = self
- .aux.rpc_client
+ .aux
+ .rpc_client
.try_call_many(
&who[..],
rpc,
@@ -276,7 +270,8 @@ where
async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
- self.aux.rpc_client
+ self.aux
+ .rpc_client
.try_call_many(
&who[..],
TableRPC::<F>::Update(vec![what_enc]),
@@ -296,7 +291,8 @@ where
});
let self2 = self.clone();
- self.aux.rpc_client
+ self.aux
+ .rpc_client
.set_local_handler(self.aux.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle(&msg).await }
@@ -318,9 +314,7 @@ where
Ok(TableRPC::Ok)
}
TableRPC::SyncRPC(rpc) => {
- let response = self.syncer
- .handle_rpc(rpc, self.aux.system.background.stop_signal.clone())
- .await?;
+ let response = self.syncer.handle_rpc(rpc).await?;
Ok(TableRPC::SyncRPC(response))
}
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),