diff options
-rw-r--r-- | src/rpc/layout/history.rs | 41 | ||||
-rw-r--r-- | src/rpc/layout/manager.rs | 33 | ||||
-rw-r--r-- | src/rpc/layout/schema.rs | 9 | ||||
-rw-r--r-- | src/rpc/layout/version.rs | 2 | ||||
-rw-r--r-- | src/table/table.rs | 2 |
5 files changed, 74 insertions, 13 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index e17a1c77..dbb02269 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -32,14 +32,6 @@ impl LayoutHistory { self.versions.last().as_ref().unwrap() } - pub fn all_storage_nodes(&self) -> HashSet<Uuid> { - self.versions - .iter() - .map(|x| x.nongateway_nodes()) - .flatten() - .collect::<HashSet<_>>() - } - pub fn update_hashes(&mut self) { self.trackers_hash = self.calculate_trackers_hash(); self.staging_hash = self.calculate_staging_hash(); @@ -53,6 +45,39 @@ impl LayoutHistory { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } + // ------------------ who stores what now? --------------- + + pub fn max_ack(&self) -> u64 { + self.calculate_global_min(&self.update_trackers.ack_map) + } + + pub fn all_storage_nodes(&self) -> HashSet<Uuid> { + // TODO: cache this + self.versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::<HashSet<_>>() + } + + pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { + let sync_min = self.calculate_global_min(&self.update_trackers.sync_map); + let version = self + .versions + .iter() + .find(|x| x.version == sync_min) + .or(self.versions.last()) + .unwrap(); + version.nodes_of(position, version.replication_factor) + } + + pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { + self.versions + .iter() + .map(|x| x.nodes_of(position, x.replication_factor)) + .collect::<Vec<_>>() + } + // ------------------ update tracking --------------- pub(crate) fn update_trackers(&mut self, node_id: Uuid) { diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index c1417dac..b0302b12 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,4 +1,5 @@ -use std::sync::{Arc, RwLock, RwLockReadGuard}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -26,6 +27,8 @@ pub struct LayoutManager { layout: Arc<RwLock<LayoutHistory>>, pub(crate) change_notify: Arc<Notify>, + table_sync_version: Mutex<HashMap<String, u64>>, + pub(crate) rpc_helper: RpcHelper, system_endpoint: Arc<Endpoint<SystemRpc, System>>, } @@ -117,6 +120,34 @@ impl LayoutManager { Ok(()) } + pub fn add_table(&self, table_name: &'static str) { + let first_version = self.layout().versions.first().unwrap().version; + + self.table_sync_version + .lock() + .unwrap() + .insert(table_name.to_string(), first_version); + } + + pub fn sync_table_until(self: &Arc<Self>, table_name: &'static str, version: u64) { + let mut table_sync_version = self.table_sync_version.lock().unwrap(); + *table_sync_version.get_mut(table_name).unwrap() = version; + let sync_until = table_sync_version.iter().map(|(_, v)| *v).max().unwrap(); + drop(table_sync_version); + + let mut layout = self.layout.write().unwrap(); + if layout + .update_trackers + .sync_map + .set_max(self.node_id, sync_until) + { + layout.update_hashes(); + self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( + layout.update_trackers.clone(), + )); + } + } + // ---- INTERNALS --- fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index db60c806..89f5c361 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -375,14 +375,17 @@ impl UpdateTracker { changed } - pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) { + pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool { match self.0.get_mut(&peer) { - Some(e) => { - *e = std::cmp::max(*e, value); + Some(e) if *e < value => { + *e = value; + true } None => { self.0.insert(peer, value); + true } + _ => false, } } } diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 65c62f63..8133672a 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -109,7 +109,7 @@ impl LayoutVersion { .collect::<Vec<_>>() } - /// Walk the ring to find the n servers in which data should be replicated + /// Return the n servers in which data for this hash should be replicated pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec<Uuid> { assert_eq!(n, self.replication_factor); diff --git a/src/table/table.rs b/src/table/table.rs index 3e3fd138..997fd7dc 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -80,6 +80,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> { let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone()); let gc = TableGc::new(system.clone(), data.clone()); + system.layout_manager.add_table(F::TABLE_NAME); + let table = Arc::new(Self { system, data, |