aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout/manager.rs')
-rw-r--r--src/rpc/layout/manager.rs22
1 files changed, 9 insertions, 13 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index 21ec2d8d..e270ad21 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -24,7 +24,7 @@ pub struct LayoutManager {
replication_factor: usize,
persist_cluster_layout: Persister<LayoutHistory>,
- layout: Arc<RwLock<LayoutHistory>>,
+ layout: Arc<RwLock<LayoutHelper>>,
pub(crate) change_notify: Arc<Notify>,
table_sync_version: Mutex<HashMap<String, u64>>,
@@ -54,7 +54,7 @@ impl LayoutManager {
let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
- let mut cluster_layout = match persist_cluster_layout.load() {
+ let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
if x.current().replication_factor != replication_factor {
return Err(Error::Message(format!(
@@ -74,6 +74,7 @@ impl LayoutManager {
}
};
+ let mut cluster_layout = LayoutHelper::new(cluster_layout);
cluster_layout.update_trackers_of(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout));
@@ -100,7 +101,7 @@ impl LayoutManager {
// ---- PUBLIC INTERFACE ----
- pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
+ pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHelper> {
self.layout.read().unwrap()
}
@@ -108,8 +109,8 @@ impl LayoutManager {
let layout = self.layout();
LayoutStatus {
cluster_layout_version: layout.current().version,
- cluster_layout_trackers_hash: layout.trackers_hash,
- cluster_layout_staging_hash: layout.staging_hash,
+ cluster_layout_trackers_hash: layout.trackers_hash(),
+ cluster_layout_staging_hash: layout.staging_hash(),
}
}
@@ -137,13 +138,8 @@ impl LayoutManager {
drop(table_sync_version);
let mut layout = self.layout.write().unwrap();
- if layout
- .update_trackers
- .sync_map
- .set_max(self.node_id, sync_until)
- {
+ if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) {
debug!("sync_until updated to {}", sync_until);
- layout.update_hashes();
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
layout.update_trackers.clone(),
));
@@ -157,7 +153,7 @@ impl LayoutManager {
let prev_layout_check = layout.check().is_ok();
if !prev_layout_check || adv.check().is_ok() {
- if layout.merge(adv) {
+ if layout.update(|l| l.merge(adv)) {
layout.update_trackers_of(self.node_id);
if prev_layout_check && layout.check().is_err() {
panic!("Merged two correct layouts and got an incorrect layout.");
@@ -171,7 +167,7 @@ impl LayoutManager {
fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> {
let mut layout = self.layout.write().unwrap();
if layout.update_trackers != *adv {
- if layout.update_trackers.merge(adv) {
+ if layout.update(|l| l.update_trackers.merge(adv)) {
layout.update_trackers_of(self.node_id);
return Some(layout.update_trackers.clone());
}