diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-09 16:32:31 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-09 16:32:31 +0100 |
commit | df36cf3099f6010c4fc62109b85d4d1e62f160cc (patch) | |
tree | c2ad68640220da38796d63f24117cf56f3f77bb9 /src/rpc/layout/manager.rs | |
parent | 9d95f6f7040c1899715ae4f984313427b1432758 (diff) | |
download | garage-df36cf3099f6010c4fc62109b85d4d1e62f160cc.tar.gz garage-df36cf3099f6010c4fc62109b85d4d1e62f160cc.zip |
layout: add helpers to LayoutHistory and prepare integration with Table
Diffstat (limited to 'src/rpc/layout/manager.rs')
-rw-r--r-- | src/rpc/layout/manager.rs | 33 |
1 files changed, 32 insertions, 1 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index c1417dac..b0302b12 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,4 +1,5 @@ -use std::sync::{Arc, RwLock, RwLockReadGuard}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -26,6 +27,8 @@ pub struct LayoutManager { layout: Arc<RwLock<LayoutHistory>>, pub(crate) change_notify: Arc<Notify>, + table_sync_version: Mutex<HashMap<String, u64>>, + pub(crate) rpc_helper: RpcHelper, system_endpoint: Arc<Endpoint<SystemRpc, System>>, } @@ -117,6 +120,34 @@ impl LayoutManager { Ok(()) } + pub fn add_table(&self, table_name: &'static str) { + let first_version = self.layout().versions.first().unwrap().version; + + self.table_sync_version + .lock() + .unwrap() + .insert(table_name.to_string(), first_version); + } + + pub fn sync_table_until(self: &Arc<Self>, table_name: &'static str, version: u64) { + let mut table_sync_version = self.table_sync_version.lock().unwrap(); + *table_sync_version.get_mut(table_name).unwrap() = version; + let sync_until = table_sync_version.iter().map(|(_, v)| *v).max().unwrap(); + drop(table_sync_version); + + let mut layout = self.layout.write().unwrap(); + if layout + .update_trackers + .sync_map + .set_max(self.node_id, sync_until) + { + layout.update_hashes(); + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( + layout.update_trackers.clone(), + )); + } + } + // ---- INTERNALS --- fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { |