aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-11 12:37:33 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-11 12:44:27 +0100
commitdf24bb806d64d5d5e748c35efe3f49ad3dda709e (patch)
tree4314a04ba2f01a297eedd0a45566551e74239ef2 /src/table/sync.rs
parentce89d1ddabe3b9e638b0173949726522ae9a0311 (diff)
downloadgarage-df24bb806d64d5d5e748c35efe3f49ad3dda709e.tar.gz
garage-df24bb806d64d5d5e748c35efe3f49ad3dda709e.zip
layout/sync: fix bugs and add tracing
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs60
1 files changed, 38 insertions, 22 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 43636faa..8c21db8b 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -488,8 +488,29 @@ struct SyncWorker<F: TableSchema, R: TableReplication> {
}
impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
+ fn check_add_full_sync(&mut self) {
+ let layout_versions = self.syncer.system.cluster_layout().sync_versions();
+ if layout_versions != self.layout_versions {
+ self.layout_versions = layout_versions;
+ info!(
+ "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list",
+ F::TABLE_NAME,
+ layout_versions.0,
+ layout_versions.1,
+ layout_versions.2
+ );
+ self.add_full_sync();
+ }
+ }
+
fn add_full_sync(&mut self) {
let mut partitions = self.syncer.data.replication.sync_partitions();
+ info!(
+ "{}: Adding full sync for ack layout version {}",
+ F::TABLE_NAME,
+ partitions.layout_version
+ );
+
partitions.partitions.shuffle(&mut thread_rng());
self.todo = Some(partitions);
self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
@@ -510,6 +531,8 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ self.check_add_full_sync();
+
if let Some(todo) = &mut self.todo {
let partition = todo.partitions.pop().unwrap();
@@ -531,19 +554,23 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
return Err(e);
}
- // done
- if !todo.partitions.is_empty() {
- return Ok(WorkerState::Busy);
+ if todo.partitions.is_empty() {
+ info!(
+ "{}: Completed full sync for ack layout version {}",
+ F::TABLE_NAME,
+ todo.layout_version
+ );
+ self.syncer
+ .system
+ .layout_manager
+ .sync_table_until(F::TABLE_NAME, todo.layout_version);
+ self.todo = None;
}
- self.syncer
- .system
- .layout_manager
- .sync_table_until(F::TABLE_NAME, todo.layout_version);
+ Ok(WorkerState::Busy)
+ } else {
+ Ok(WorkerState::Idle)
}
-
- self.todo = None;
- Ok(WorkerState::Idle)
}
async fn wait_for_work(&mut self) -> WorkerState {
@@ -554,18 +581,7 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
}
},
_ = self.layout_notify.notified() => {
- let layout_versions = self.syncer.system.cluster_layout().sync_versions();
- if layout_versions != self.layout_versions {
- self.layout_versions = layout_versions;
- debug!(
- "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list",
- F::TABLE_NAME,
- layout_versions.0,
- layout_versions.1,
- layout_versions.2
- );
- self.add_full_sync();
- }
+ self.check_add_full_sync();
},
_ = tokio::time::sleep_until(self.next_full_sync.into()) => {
self.add_full_sync();