aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout/manager.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-09 16:32:31 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-09 16:32:31 +0100
commitdf36cf3099f6010c4fc62109b85d4d1e62f160cc (patch)
treec2ad68640220da38796d63f24117cf56f3f77bb9 /src/rpc/layout/manager.rs
parent9d95f6f7040c1899715ae4f984313427b1432758 (diff)
downloadgarage-df36cf3099f6010c4fc62109b85d4d1e62f160cc.tar.gz
garage-df36cf3099f6010c4fc62109b85d4d1e62f160cc.zip
layout: add helpers to LayoutHistory and prepare integration with Table
Diffstat (limited to 'src/rpc/layout/manager.rs')
-rw-r--r--src/rpc/layout/manager.rs33
1 files changed, 32 insertions, 1 deletions
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index c1417dac..b0302b12 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -1,4 +1,5 @@
-use std::sync::{Arc, RwLock, RwLockReadGuard};
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
use std::time::Duration;
use serde::{Deserialize, Serialize};
@@ -26,6 +27,8 @@ pub struct LayoutManager {
layout: Arc<RwLock<LayoutHistory>>,
pub(crate) change_notify: Arc<Notify>,
+ table_sync_version: Mutex<HashMap<String, u64>>,
+
pub(crate) rpc_helper: RpcHelper,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
}
@@ -117,6 +120,34 @@ impl LayoutManager {
Ok(())
}
+ pub fn add_table(&self, table_name: &'static str) {
+ let first_version = self.layout().versions.first().unwrap().version;
+
+ self.table_sync_version
+ .lock()
+ .unwrap()
+ .insert(table_name.to_string(), first_version);
+ }
+
+ pub fn sync_table_until(self: &Arc<Self>, table_name: &'static str, version: u64) {
+ let mut table_sync_version = self.table_sync_version.lock().unwrap();
+ *table_sync_version.get_mut(table_name).unwrap() = version;
+ let sync_until = table_sync_version.iter().map(|(_, v)| *v).max().unwrap();
+ drop(table_sync_version);
+
+ let mut layout = self.layout.write().unwrap();
+ if layout
+ .update_trackers
+ .sync_map
+ .set_max(self.node_id, sync_until)
+ {
+ layout.update_hashes();
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
+ layout.update_trackers.clone(),
+ ));
+ }
+ }
+
// ---- INTERNALS ---
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {