aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-12-08 14:54:11 +0100
committerAlex Auvolat <alex@adnab.me>2023-12-08 14:54:11 +0100
commitf8df90b79b93e4a1391839435718bad8c697246d (patch)
treed6cf0dbbdd849a7beec9f2a004fca89217ae4af3
parent4dbf254512327ef4e7abbd5525b89bfa5b7ecb6f (diff)
downloadgarage-f8df90b79b93e4a1391839435718bad8c697246d.tar.gz
garage-f8df90b79b93e4a1391839435718bad8c697246d.zip
table: fix insert_many to not send duplicates
-rw-r--r--src/table/table.rs13
1 files changed, 12 insertions, 1 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 6508cf5d..59cfdd07 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -196,6 +196,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let hash = entry.partition_key().hash();
let mut write_sets = self.data.replication.write_sets(&hash);
for set in write_sets.as_mut().iter_mut() {
+ // Sort nodes in each write sets to merge write sets with same
+ // nodes but in possibly different orders
set.sort();
}
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
@@ -220,7 +222,16 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
for (write_sets, entry_enc) in entries_vec.iter() {
for write_set in write_sets.as_ref().iter() {
for node in write_set.iter() {
- call_list.entry(*node).or_default().push(entry_enc.clone())
+ let node_entries = call_list.entry(*node).or_default();
+ match node_entries.last() {
+ Some(x) if Arc::ptr_eq(x, entry_enc) => {
+ // skip if entry already in list to send to this node
+ // (could happen if node is in several write sets for this entry)
+ }
+ _ => {
+ node_entries.push(entry_enc.clone());
+ }
+ }
}
}
}