aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-11 12:37:33 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-11 12:44:27 +0100
commitdf24bb806d64d5d5e748c35efe3f49ad3dda709e (patch)
tree4314a04ba2f01a297eedd0a45566551e74239ef2 /src
parentce89d1ddabe3b9e638b0173949726522ae9a0311 (diff)
downloadgarage-df24bb806d64d5d5e748c35efe3f49ad3dda709e.tar.gz
garage-df24bb806d64d5d5e748c35efe3f49ad3dda709e.zip
layout/sync: fix bugs and add tracing
Diffstat (limited to 'src')
-rw-r--r--src/rpc/layout/history.rs3
-rw-r--r--src/rpc/layout/manager.rs10
-rw-r--r--src/table/sync.rs60
3 files changed, 48 insertions, 25 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 185dbb27..cef56647 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -131,7 +131,8 @@ impl LayoutHistory {
pub(crate) fn cleanup_old_versions(&mut self) {
let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map);
while self.versions.first().as_ref().unwrap().version < min_sync_ack {
- self.versions.remove(0);
+ let removed = self.versions.remove(0);
+ info!("Layout history: pruning old version {}", removed.version);
}
}
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index 7d60bae6..ce8b6f61 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -133,7 +133,7 @@ impl LayoutManager {
pub fn sync_table_until(self: &Arc<Self>, table_name: &'static str, version: u64) {
let mut table_sync_version = self.table_sync_version.lock().unwrap();
*table_sync_version.get_mut(table_name).unwrap() = version;
- let sync_until = table_sync_version.iter().map(|(_, v)| *v).max().unwrap();
+ let sync_until = table_sync_version.iter().map(|(_, v)| *v).min().unwrap();
drop(table_sync_version);
let mut layout = self.layout.write().unwrap();
@@ -142,6 +142,7 @@ impl LayoutManager {
.sync_map
.set_max(self.node_id, sync_until)
{
+ debug!("sync_until updated to {}", sync_until);
layout.update_hashes();
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
layout.update_trackers.clone(),
@@ -277,7 +278,12 @@ impl LayoutManager {
self: &Arc<Self>,
adv: &LayoutHistory,
) -> Result<SystemRpc, Error> {
- debug!("handle_advertise_cluster_layout: {:?}", adv);
+ debug!(
+ "handle_advertise_cluster_layout: {} versions, last={}, trackers={:?}",
+ adv.versions.len(),
+ adv.current().version,
+ adv.update_trackers
+ );
if adv.current().replication_factor != self.replication_factor {
let msg = format!(
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 43636faa..8c21db8b 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -488,8 +488,29 @@ struct SyncWorker<F: TableSchema, R: TableReplication> {
}
impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
+ fn check_add_full_sync(&mut self) {
+ let layout_versions = self.syncer.system.cluster_layout().sync_versions();
+ if layout_versions != self.layout_versions {
+ self.layout_versions = layout_versions;
+ info!(
+ "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list",
+ F::TABLE_NAME,
+ layout_versions.0,
+ layout_versions.1,
+ layout_versions.2
+ );
+ self.add_full_sync();
+ }
+ }
+
fn add_full_sync(&mut self) {
let mut partitions = self.syncer.data.replication.sync_partitions();
+ info!(
+ "{}: Adding full sync for ack layout version {}",
+ F::TABLE_NAME,
+ partitions.layout_version
+ );
+
partitions.partitions.shuffle(&mut thread_rng());
self.todo = Some(partitions);
self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
@@ -510,6 +531,8 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ self.check_add_full_sync();
+
if let Some(todo) = &mut self.todo {
let partition = todo.partitions.pop().unwrap();
@@ -531,19 +554,23 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
return Err(e);
}
- // done
- if !todo.partitions.is_empty() {
- return Ok(WorkerState::Busy);
+ if todo.partitions.is_empty() {
+ info!(
+ "{}: Completed full sync for ack layout version {}",
+ F::TABLE_NAME,
+ todo.layout_version
+ );
+ self.syncer
+ .system
+ .layout_manager
+ .sync_table_until(F::TABLE_NAME, todo.layout_version);
+ self.todo = None;
}
- self.syncer
- .system
- .layout_manager
- .sync_table_until(F::TABLE_NAME, todo.layout_version);
+ Ok(WorkerState::Busy)
+ } else {
+ Ok(WorkerState::Idle)
}
-
- self.todo = None;
- Ok(WorkerState::Idle)
}
async fn wait_for_work(&mut self) -> WorkerState {
@@ -554,18 +581,7 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
}
},
_ = self.layout_notify.notified() => {
- let layout_versions = self.syncer.system.cluster_layout().sync_versions();
- if layout_versions != self.layout_versions {
- self.layout_versions = layout_versions;
- debug!(
- "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list",
- F::TABLE_NAME,
- layout_versions.0,
- layout_versions.1,
- layout_versions.2
- );
- self.add_full_sync();
- }
+ self.check_add_full_sync();
},
_ = tokio::time::sleep_until(self.next_full_sync.into()) => {
self.add_full_sync();