diff options
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 58 |
1 files changed, 31 insertions, 27 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index ac0305e2..9c148393 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::data::*; use garage_util::error::Error; +use garage_rpc::membership::System; use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -29,8 +30,9 @@ const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); pub struct TableSyncer<F: TableSchema, R: TableReplication> { - data: Arc<TableData<F>>, - aux: Arc<TableAux<R>>, + system: Arc<System>, + data: Arc<TableData<F, R>>, + merkle: Arc<MerkleUpdater<F, R>>, todo: Mutex<SyncTodo>, rpc_client: Arc<RpcClient<SyncRPC>>, @@ -76,18 +78,20 @@ where R: TableReplication + 'static, { pub(crate) fn launch( - data: Arc<TableData<F>>, - aux: Arc<TableAux<R>>, + system: Arc<System>, + data: Arc<TableData<F, R>>, + merkle: Arc<MerkleUpdater<F, R>>, rpc_server: &mut RpcServer, ) -> Arc<Self> { let rpc_path = format!("table_{}/sync", data.name); - let rpc_client = aux.system.rpc_client::<SyncRPC>(&rpc_path); + let rpc_client = system.rpc_client::<SyncRPC>(&rpc_path); let todo = SyncTodo { todo: vec![] }; let syncer = Arc::new(Self { + system: system.clone(), data: data.clone(), - aux: aux.clone(), + merkle, todo: Mutex::new(todo), rpc_client, }); @@ -97,13 +101,13 @@ where let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let s1 = syncer.clone(); - aux.system.background.spawn_worker( + system.background.spawn_worker( format!("table sync watcher for {}", data.name), move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), ); let s2 = syncer.clone(); - aux.system.background.spawn_worker( + system.background.spawn_worker( format!("table syncer for {}", data.name), move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), ); @@ -126,7 +130,7 @@ where let self2 = self.clone(); self.rpc_client - .set_local_handler(self.aux.system.id, move |msg| { + .set_local_handler(self.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle_rpc(&msg).await } }); @@ -137,8 +141,8 @@ where mut must_exit: watch::Receiver<bool>, mut busy_rx: mpsc::UnboundedReceiver<bool>, ) { - let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone(); + let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone(); + let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone(); let mut nothing_to_do_since = Some(Instant::now()); while !*must_exit.borrow() { @@ -178,7 +182,7 @@ where self.todo .lock() .unwrap() - .add_full_sync(&self.data, &self.aux); + .add_full_sync(&self.data, &self.system); } async fn syncer_task( @@ -213,10 +217,10 @@ where must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { if partition.retain { - let my_id = self.aux.system.id; + let my_id = self.system.id; let nodes = self - .aux + .data .replication .write_nodes(&hash_of_merkle_partition(partition.range.begin)) .into_iter() @@ -242,7 +246,7 @@ where warn!("({}) Sync error: {}", self.data.name, e); } } - if n_errors > self.aux.replication.max_write_errors() { + if n_errors > self.data.replication.max_write_errors() { return Err(Error::Message(format!( "Sync failed with too many nodes (should have been: {:?}).", nodes @@ -288,19 +292,19 @@ where if items.len() > 0 { let nodes = self - .aux + .data .replication .write_nodes(&begin) .into_iter() .collect::<Vec<_>>(); - if nodes.contains(&self.aux.system.id) { + if nodes.contains(&self.system.id) { warn!( "({}) Interrupting offload as partitions seem to have changed", self.data.name ); break; } - if nodes.len() < self.aux.replication.write_quorum() { + if nodes.len() < self.data.replication.write_quorum() { return Err(Error::Message(format!( "Not offloading as we don't have a quorum of nodes to write to." ))); @@ -376,7 +380,7 @@ where partition: u16::to_be_bytes(i), prefix: vec![], }; - match self.data.merkle_updater.read_node(&key)? { + match self.merkle.read_node(&key)? { MerkleNode::Empty => (), x => { ret.push((key.partition, hash_of(&x)?)); @@ -458,7 +462,7 @@ where while !todo.is_empty() && !*must_exit.borrow() { let key = todo.pop_front().unwrap(); - let node = self.data.merkle_updater.read_node(&key)?; + let node = self.merkle.read_node(&key)?; match node { MerkleNode::Empty => { @@ -570,7 +574,7 @@ where } } SyncRPC::GetNode(k) => { - let node = self.data.merkle_updater.read_node(&k)?; + let node = self.merkle.read_node(&k)?; Ok(SyncRPC::Node(k.clone(), node)) } SyncRPC::Items(items) => { @@ -585,15 +589,15 @@ where impl SyncTodo { fn add_full_sync<F: TableSchema, R: TableReplication>( &mut self, - data: &TableData<F>, - aux: &TableAux<R>, + data: &TableData<F, R>, + system: &System, ) { - let my_id = aux.system.id; + let my_id = system.id; self.todo.clear(); - let ring = aux.system.ring.borrow().clone(); - let split_points = aux.replication.split_points(&ring); + let ring = system.ring.borrow().clone(); + let split_points = data.replication.split_points(&ring); for i in 0..split_points.len() { let begin: MerklePartition = { @@ -613,7 +617,7 @@ impl SyncTodo { let begin_hash = hash_of_merkle_partition(begin); let end_hash = hash_of_merkle_partition_opt(end); - let nodes = aux.replication.write_nodes(&begin_hash); + let nodes = data.replication.write_nodes(&begin_hash); let retain = nodes.contains(&my_id); if !retain { |