From 7fdaf7aef0c2aa8b38dbc7dac630f6f9baf8f0a4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 14:37:46 +0100 Subject: Fix merkle updater not being notified; improved logging --- src/api/s3_put.rs | 4 ++-- src/table/data.rs | 4 ++-- src/table/merkle.rs | 13 +++++++------ src/table/sync.rs | 13 +++++++------ 4 files changed, 18 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 2c5e364f..17732ced 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -472,8 +472,8 @@ pub async fn handle_complete_multipart_upload( }; // Check that the list of parts they gave us corresponds to the parts we have here - println!("Expected parts from request: {:?}", body_list_of_parts); - println!("Parts stored in version: {:?}", version.parts_etags.items()); + debug!("Expected parts from request: {:?}", body_list_of_parts); + debug!("Parts stored in version: {:?}", version.parts_etags.items()); let parts = version .parts_etags .items() diff --git a/src/table/data.rs b/src/table/data.rs index 6217bf6d..2817a849 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -142,7 +142,7 @@ where if let Some((old_entry, new_entry)) = changed { self.instance.updated(old_entry, Some(new_entry)); - //self.syncer.load_full().unwrap().invalidate(&tree_key[..]); + self.merkle_updater.todo_notify.notify(); } Ok(()) @@ -163,7 +163,7 @@ where if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); - //self.syncer.load_full().unwrap().invalidate(k); + self.merkle_updater.todo_notify.notify(); } Ok(removed) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 467ce615..a694c9e9 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -4,7 +4,7 @@ use std::time::Duration; use futures::select; use futures_util::future::*; -use log::{info, warn}; +use log::{debug, warn}; use serde::{Deserialize, Serialize}; use sled::transaction::{ ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, @@ -109,11 +109,11 @@ impl MerkleUpdater { match x { Ok((key, valhash)) => { if let Err(e) = self.update_item(&key[..], &valhash[..]) { - warn!("Error while updating Merkle tree item: {}", e); + warn!("({}) Error while updating Merkle tree item: {}", self.table_name, e); } } Err(e) => { - warn!("Error while iterating on Merkle todo tree: {}", e); + warn!("({}) Error while iterating on Merkle todo tree: {}", self.table_name, e); tokio::time::delay_for(Duration::from_secs(10)).await; } } @@ -152,8 +152,9 @@ impl MerkleUpdater { .is_ok(); if !deleted { - info!( - "Item not deleted from Merkle todo because it changed: {:?}", + debug!( + "({}) Item not deleted from Merkle todo because it changed: {:?}", + self.table_name, k ); } @@ -195,7 +196,7 @@ impl MerkleUpdater { if children.len() == 0 { // should not happen - warn!("Replacing intermediate node with empty node, should not happen."); + warn!("({}) Replacing intermediate node with empty node, should not happen.", self.table_name); Some(MerkleNode::Empty) } else if children.len() == 1 { // We now have a single node (case when the update deleted one of only two diff --git a/src/table/sync.rs b/src/table/sync.rs index 07d48155..dbfa0a9f 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -274,7 +274,7 @@ where .into_iter() .collect::>(); if nodes.contains(&self.aux.system.id) { - warn!("Interrupting offload as partitions seem to have changed"); + warn!("({}) Interrupting offload as partitions seem to have changed", self.data.name); break; } if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) { @@ -282,8 +282,9 @@ where } counter += 1; - debug!( - "Offloading {} items from {:?}..{:?} ({})", + info!( + "({}) Offloading {} items from {:?}..{:?} ({})", + self.data.name, items.len(), begin, end, @@ -325,7 +326,7 @@ where } if not_removed > 0 { - debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed); + debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed); } Ok(()) @@ -448,11 +449,11 @@ where // Just send that item directly if let Some(val) = self.data.store.get(&ik[..])? { if blake2sum(&val[..]) != ivhash { - warn!("Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", ik); + warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik); } todo_items.push(val.to_vec()); } else { - warn!("Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", ik); + warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik); } } MerkleNode::Intermediate(l) => { -- cgit v1.2.3