diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rpc/layout/helper.rs | 57 | ||||
-rw-r--r-- | src/rpc/layout/manager.rs | 8 |
2 files changed, 27 insertions, 38 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index b15f7540..3a033ab2 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -238,29 +238,15 @@ impl LayoutHelper { // ------------------ helpers for update tracking --------------- - pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { + pub(crate) fn update_update_trackers(&mut self, local_node_id: Uuid) { // Ensure trackers for this node's values are up-to-date // 1. Acknowledge the last layout version which is not currently // locked by an in-progress write operation - self.ack_max_free(local_node_id); + self.update_ack_to_max_free(local_node_id); // 2. Assume the data on this node is sync'ed up at least to // the first layout version in the history - self.sync_first(local_node_id); - - // 3. Acknowledge everyone has synced up to min(self.sync_map) - self.sync_ack(local_node_id); - - debug!("ack_map: {:?}", self.inner().update_trackers.ack_map); - debug!("sync_map: {:?}", self.inner().update_trackers.sync_map); - debug!( - "sync_ack_map: {:?}", - self.inner().update_trackers.sync_ack_map - ); - } - - fn sync_first(&mut self, local_node_id: Uuid) { let first_version = self.inner().min_stored(); self.update(|layout| { layout @@ -268,9 +254,8 @@ impl LayoutHelper { .sync_map .set_max(local_node_id, first_version) }); - } - fn sync_ack(&mut self, local_node_id: Uuid) { + // 3. Acknowledge everyone has synced up to min(self.sync_map) let sync_map_min = self.sync_map_min; self.update(|layout| { layout @@ -278,24 +263,18 @@ impl LayoutHelper { .sync_ack_map .set_max(local_node_id, sync_map_min) }); - } - pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { - let max_ack = self.max_free_ack(); - let changed = self.update(|layout| { - layout - .update_trackers - .ack_map - .set_max(local_node_id, max_ack) - }); - if changed { - info!("ack_until updated to {}", max_ack); - } - changed + debug!("ack_map: {:?}", self.inner().update_trackers.ack_map); + debug!("sync_map: {:?}", self.inner().update_trackers.sync_map); + debug!( + "sync_ack_map: {:?}", + self.inner().update_trackers.sync_ack_map + ); } - pub(crate) fn max_free_ack(&self) -> u64 { - self.versions() + pub(crate) fn update_ack_to_max_free(&mut self, local_node_id: Uuid) -> bool { + let max_free = self + .versions() .iter() .map(|x| x.version) .skip_while(|v| { @@ -305,6 +284,16 @@ impl LayoutHelper { .unwrap_or(true) }) .next() - .unwrap_or(self.current().version) + .unwrap_or(self.current().version); + let changed = self.update(|layout| { + layout + .update_trackers + .ack_map + .set_max(local_node_id, max_free) + }); + if changed { + info!("ack_until updated to {}", max_free); + } + changed } } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 0ca532ba..a0dcf50e 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -70,7 +70,7 @@ impl LayoutManager { cluster_layout, Default::default(), ); - cluster_layout.update_trackers(node_id.into()); + cluster_layout.update_update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); let change_notify = Arc::new(Notify::new()); @@ -134,7 +134,7 @@ impl LayoutManager { fn ack_new_version(self: &Arc<Self>) { let mut layout = self.layout.write().unwrap(); - if layout.ack_max_free(self.node_id) { + if layout.update_ack_to_max_free(self.node_id) { self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( layout.inner().update_trackers.clone(), )); @@ -164,7 +164,7 @@ impl LayoutManager { if !prev_layout_check || adv.check().is_ok() { if layout.update(|l| l.merge(adv)) { - layout.update_trackers(self.node_id); + layout.update_update_trackers(self.node_id); if prev_layout_check && !layout.is_check_ok() { panic!("Merged two correct layouts and got an incorrect layout."); } @@ -182,7 +182,7 @@ impl LayoutManager { if layout.inner().update_trackers != *adv { if layout.update(|l| l.update_trackers.merge(adv)) { - layout.update_trackers(self.node_id); + layout.update_update_trackers(self.node_id); assert!(layout.digest() != prev_digest); return Some(layout.inner().update_trackers.clone()); } |