aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rpc/layout/history.rs41
-rw-r--r--src/rpc/layout/manager.rs33
-rw-r--r--src/rpc/layout/schema.rs9
-rw-r--r--src/rpc/layout/version.rs2
-rw-r--r--src/table/table.rs2
5 files changed, 74 insertions, 13 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index e17a1c77..dbb02269 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -32,14 +32,6 @@ impl LayoutHistory {
self.versions.last().as_ref().unwrap()
}
- pub fn all_storage_nodes(&self) -> HashSet<Uuid> {
- self.versions
- .iter()
- .map(|x| x.nongateway_nodes())
- .flatten()
- .collect::<HashSet<_>>()
- }
-
pub fn update_hashes(&mut self) {
self.trackers_hash = self.calculate_trackers_hash();
self.staging_hash = self.calculate_staging_hash();
@@ -53,6 +45,39 @@ impl LayoutHistory {
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
}
+ // ------------------ who stores what now? ---------------
+
+ pub fn max_ack(&self) -> u64 {
+ self.calculate_global_min(&self.update_trackers.ack_map)
+ }
+
+ pub fn all_storage_nodes(&self) -> HashSet<Uuid> {
+ // TODO: cache this
+ self.versions
+ .iter()
+ .map(|x| x.nongateway_nodes())
+ .flatten()
+ .collect::<HashSet<_>>()
+ }
+
+ pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let sync_min = self.calculate_global_min(&self.update_trackers.sync_map);
+ let version = self
+ .versions
+ .iter()
+ .find(|x| x.version == sync_min)
+ .or(self.versions.last())
+ .unwrap();
+ version.nodes_of(position, version.replication_factor)
+ }
+
+ pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
+ self.versions
+ .iter()
+ .map(|x| x.nodes_of(position, x.replication_factor))
+ .collect::<Vec<_>>()
+ }
+
// ------------------ update tracking ---------------
pub(crate) fn update_trackers(&mut self, node_id: Uuid) {
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index c1417dac..b0302b12 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -1,4 +1,5 @@
-use std::sync::{Arc, RwLock, RwLockReadGuard};
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
use std::time::Duration;
use serde::{Deserialize, Serialize};
@@ -26,6 +27,8 @@ pub struct LayoutManager {
layout: Arc<RwLock<LayoutHistory>>,
pub(crate) change_notify: Arc<Notify>,
+ table_sync_version: Mutex<HashMap<String, u64>>,
+
pub(crate) rpc_helper: RpcHelper,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
}
@@ -117,6 +120,34 @@ impl LayoutManager {
Ok(())
}
+ pub fn add_table(&self, table_name: &'static str) {
+ let first_version = self.layout().versions.first().unwrap().version;
+
+ self.table_sync_version
+ .lock()
+ .unwrap()
+ .insert(table_name.to_string(), first_version);
+ }
+
+ pub fn sync_table_until(self: &Arc<Self>, table_name: &'static str, version: u64) {
+ let mut table_sync_version = self.table_sync_version.lock().unwrap();
+ *table_sync_version.get_mut(table_name).unwrap() = version;
+ let sync_until = table_sync_version.iter().map(|(_, v)| *v).max().unwrap();
+ drop(table_sync_version);
+
+ let mut layout = self.layout.write().unwrap();
+ if layout
+ .update_trackers
+ .sync_map
+ .set_max(self.node_id, sync_until)
+ {
+ layout.update_hashes();
+ self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
+ layout.update_trackers.clone(),
+ ));
+ }
+ }
+
// ---- INTERNALS ---
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
index db60c806..89f5c361 100644
--- a/src/rpc/layout/schema.rs
+++ b/src/rpc/layout/schema.rs
@@ -375,14 +375,17 @@ impl UpdateTracker {
changed
}
- pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) {
+ pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
match self.0.get_mut(&peer) {
- Some(e) => {
- *e = std::cmp::max(*e, value);
+ Some(e) if *e < value => {
+ *e = value;
+ true
}
None => {
self.0.insert(peer, value);
+ true
}
+ _ => false,
}
}
}
diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs
index 65c62f63..8133672a 100644
--- a/src/rpc/layout/version.rs
+++ b/src/rpc/layout/version.rs
@@ -109,7 +109,7 @@ impl LayoutVersion {
.collect::<Vec<_>>()
}
- /// Walk the ring to find the n servers in which data should be replicated
+ /// Return the n servers in which data for this hash should be replicated
pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec<Uuid> {
assert_eq!(n, self.replication_factor);
diff --git a/src/table/table.rs b/src/table/table.rs
index 3e3fd138..997fd7dc 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -80,6 +80,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
let gc = TableGc::new(system.clone(), data.clone());
+ system.layout_manager.add_table(F::TABLE_NAME);
+
let table = Arc::new(Self {
system,
data,