diff options
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs index 92aa8c2a..f6d4c750 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -96,7 +96,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { self: Arc<Self>, mut must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { - tokio::time::delay_for(Duration::from_secs(10)); + tokio::time::delay_for(Duration::from_secs(10)).await; self.todo.lock().await.add_full_scan(&self.table); let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); @@ -111,12 +111,12 @@ impl<F: TableSchema + 'static> TableSyncer<F> { select! { _ = next_full_scan => { next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); - eprintln!("Adding full scan to syncer todo list"); + eprintln!("({}) Adding full scan to syncer todo list", self.table.name); self.todo.lock().await.add_full_scan(&self.table); } new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { - eprintln!("Adding ring difference to syncer todo list"); + eprintln!("({}) Adding ring difference to syncer todo list", self.table.name); self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring); prev_ring = new_ring; } @@ -139,7 +139,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { if let Some(partition) = self.todo.lock().await.pop_task() { let res = self.clone().sync_partition(&partition, &mut must_exit).await; if let Err(e) = res { - eprintln!("Error while syncing {:?}: {}", partition, e); + eprintln!("({}) Error while syncing {:?}: {}", self.table.name, partition, e); } } else { tokio::time::delay_for(Duration::from_secs(1)).await; @@ -149,9 +149,9 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { - eprintln!("Calculating root checksum for {:?}...", partition); + eprintln!("({}) Calculating root checksum for {:?}...", self.table.name, partition); let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?; - eprintln!("Root checksum for {:?}: {:?}", partition, root_cks); + eprintln!("({}) Root checksum for {:?}: {:?}", self.table.name, partition, root_cks); let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor); let mut sync_futures = nodes.iter() @@ -160,7 +160,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { while let Some(r) = sync_futures.next().await { if let Err(e) = r { - eprintln!("Sync error: {}", e); + eprintln!("({}) Sync error: {}", self.table.name, e); } } if !partition.retain { @@ -198,7 +198,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let v = self.range_checksum_inner(&range, must_exit).await?; let mut cache = self.cache[range.level].lock().await; - eprintln!("Checksum for {:?}: {:?}", range, v); + eprintln!("({}) Checksum for {:?}: {:?}", self.table.name, range, v); cache.insert(range.clone(), v.clone()); Ok(v) }.boxed() @@ -281,7 +281,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { todo.push_back(root_ck); while !todo.is_empty() && !*must_exit.borrow() { - eprintln!("Sync with {:?}: {} remaining", who, todo.len()); + eprintln!("({}) Sync with {:?}: {} remaining", self.table.name, who, todo.len()); let end = std::cmp::min(16, todo.len()); let step = todo.drain(..end).collect::<Vec<_>>(); @@ -308,7 +308,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { - eprintln!("Sending {} items to {:?}", item_list.len(), who); + eprintln!("({}) Sending {} items to {:?}", self.table.name, item_list.len(), who); let mut values = vec![]; for item in item_list.iter() { |