diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 53 |
1 files changed, 26 insertions, 27 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 2ce5868f..f00b4239 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -16,20 +16,17 @@ use garage_rpc::rpc_server::*; use crate::crdt::CRDT; use crate::data::*; use crate::gc::*; +use crate::merkle::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct TableAux<R: TableReplication> { - pub system: Arc<System>, - pub replication: R, -} - pub struct Table<F: TableSchema, R: TableReplication> { - pub data: Arc<TableData<F>>, - pub aux: Arc<TableAux<R>>, + pub system: Arc<System>, + pub data: Arc<TableData<F, R>>, + pub merkle_updater: Arc<MerkleUpdater<F, R>>, pub syncer: Arc<TableSyncer<F, R>>, rpc_client: Arc<RpcClient<TableRPC<F>>>, } @@ -67,19 +64,22 @@ where let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); - let data = TableData::new(name, instance, db, system.background.clone()); + let data = TableData::new(name, instance, replication, db); - let aux = Arc::new(TableAux { - system, - replication, - }); + let merkle_updater = MerkleUpdater::launch(data.clone(), system.background.clone()); - let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); - TableGC::launch(data.clone(), aux.clone(), rpc_server); + let syncer = TableSyncer::launch( + system.clone(), + data.clone(), + merkle_updater.clone(), + rpc_server, + ); + TableGC::launch(data.clone(), system.clone(), rpc_server); let table = Arc::new(Self { + system, data, - aux, + merkle_updater, syncer, rpc_client, }); @@ -91,7 +91,7 @@ where pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash); + let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -101,7 +101,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.write_quorum()) + RequestStrategy::with_quorum(self.data.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -113,7 +113,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash); + let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -137,7 +137,7 @@ where errors.push(e); } } - if errors.len() > self.aux.replication.max_write_errors() { + if errors.len() > self.data.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -150,7 +150,7 @@ where sort_key: &F::S, ) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash); + let who = self.data.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -159,7 +159,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.read_quorum()) + RequestStrategy::with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -190,8 +190,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.aux - .system + self.system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -207,7 +206,7 @@ where limit: usize, ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash); + let who = self.data.replication.read_nodes(&hash); let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); @@ -216,7 +215,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.read_quorum()) + RequestStrategy::with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -248,7 +247,7 @@ where } if !to_repair.is_empty() { let self2 = self.clone(); - self.aux.system.background.spawn_cancellable(async move { + self.system.background.spawn_cancellable(async move { for (_, v) in to_repair.iter_mut() { self2.repair_on_read(&who[..], v.take().unwrap()).await?; } @@ -288,7 +287,7 @@ where let self2 = self.clone(); self.rpc_client - .set_local_handler(self.aux.system.id, move |msg| { + .set_local_handler(self.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); |