diff options
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 42 |
1 files changed, 25 insertions, 17 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index c5795f65..df9fb4d0 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -77,13 +77,13 @@ where ) -> Arc<Self> { let endpoint = system .netapp - .endpoint(format!("garage_table/sync.rs/Rpc:{}", data.name)); + .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); let todo = SyncTodo { todo: vec![] }; let syncer = Arc::new(Self { system: system.clone(), - data: data.clone(), + data, merkle, todo: Mutex::new(todo), endpoint, @@ -95,13 +95,13 @@ where let s1 = syncer.clone(); system.background.spawn_worker( - format!("table sync watcher for {}", data.name), + format!("table sync watcher for {}", F::TABLE_NAME), move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), ); let s2 = syncer.clone(); system.background.spawn_worker( - format!("table syncer for {}", data.name), + format!("table syncer for {}", F::TABLE_NAME), move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), ); @@ -128,7 +128,7 @@ where _ = ring_recv.changed().fuse() => { let new_ring = ring_recv.borrow(); if !Arc::ptr_eq(&new_ring, &prev_ring) { - debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); + debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); self.add_full_sync(); prev_ring = new_ring.clone(); } @@ -146,7 +146,7 @@ where _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; - debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name); + debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME); self.add_full_sync(); } } @@ -177,7 +177,9 @@ where if let Err(e) = res { warn!( "({}) Error while syncing {:?}: {}", - self.data.name, partition, e + F::TABLE_NAME, + partition, + e ); } } else { @@ -205,7 +207,9 @@ where debug!( "({}) Syncing {:?} with {:?}...", - self.data.name, partition, nodes + F::TABLE_NAME, + partition, + nodes ); let mut sync_futures = nodes .iter() @@ -219,7 +223,7 @@ where while let Some(r) = sync_futures.next().await { if let Err(e) = r { n_errors += 1; - warn!("({}) Sync error: {}", self.data.name, e); + warn!("({}) Sync error: {}", F::TABLE_NAME, e); } } if n_errors > self.data.replication.max_write_errors() { @@ -272,7 +276,7 @@ where if nodes.contains(&self.system.id) { warn!( "({}) Interrupting offload as partitions seem to have changed", - self.data.name + F::TABLE_NAME ); break; } @@ -286,7 +290,7 @@ where counter += 1; info!( "({}) Offloading {} items from {:?}..{:?} ({})", - self.data.name, + F::TABLE_NAME, items.len(), begin, end, @@ -329,7 +333,7 @@ where } if not_removed > 0 { - debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed); + debug!("({}) {} items not removed during offload because they changed in between (trying again...)", F::TABLE_NAME, not_removed); } Ok(()) @@ -360,7 +364,9 @@ where if root_ck.is_empty() { debug!( "({}) Sync {:?} with {:?}: partition is empty.", - self.data.name, partition, who + F::TABLE_NAME, + partition, + who ); return Ok(()); } @@ -384,7 +390,9 @@ where SyncRpc::RootCkDifferent(false) => { debug!( "({}) Sync {:?} with {:?}: no difference", - self.data.name, partition, who + F::TABLE_NAME, + partition, + who ); return Ok(()); } @@ -413,11 +421,11 @@ where // Just send that item directly if let Some(val) = self.data.store.get(&ik[..])? { if blake2sum(&val[..]) != ivhash { - warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik); + warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik); } todo_items.push(val.to_vec()); } else { - warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik); + warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik); } } MerkleNode::Intermediate(l) => { @@ -482,7 +490,7 @@ where async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", - self.data.name, + F::TABLE_NAME, item_value_list.len(), who ); |