From e7810e9cb3cdbe6aaecddddd1146bf15e5b50c7c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 24 Jun 2022 11:04:55 +0200 Subject: Smaller batches for index counter propagation --- src/block/repair.rs | 1 + src/model/index_counter.rs | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/block/repair.rs b/src/block/repair.rs index a5c01629..97989780 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -53,6 +53,7 @@ impl Worker for RepairWorker { // This is mostly because the Rust bindings for SQLite assume a worst-case scenario // where SQLite is not compiled in thread-safe mode, so we have to wrap everything // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322). + // TODO: maybe do this with tokio::task::spawn_blocking ? let mut batch_of_hashes = vec![]; let start_bound = match self.next_start.as_ref() { None => Bound::Unbounded, diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 474ec12c..9d5aa955 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -429,7 +429,8 @@ impl Worker for IndexPropagatorWorker { }; if !self.buf.is_empty() { - let entries = self.buf.iter().map(|(_k, v)| v); + let entries_k = self.buf.keys().take(100).cloned().collect::>(); + let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap()); if let Err(e) = self.index_counter.table.insert_many(entries).await { self.errors += 1; if self.errors >= 2 && *must_exit.borrow() { @@ -441,7 +442,9 @@ impl Worker for IndexPropagatorWorker { // things to go back to normal return Err(e); } else { - self.buf.clear(); + for k in entries_k { + self.buf.remove(&k); + } self.errors = 0; } -- cgit v1.2.3