diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 20 |
1 files changed, 17 insertions, 3 deletions
diff --git a/src/table.rs b/src/table.rs index d5357277..619c96d2 100644 --- a/src/table.rs +++ b/src/table.rs @@ -427,6 +427,8 @@ where self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>, ) -> Result<(), Error> { + let mut epidemic_propagate = vec![]; + for update_bytes in entries.drain(..) { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; @@ -449,16 +451,28 @@ where .map_err(Error::RMPEncode) .map_err(sled::ConflictableTransactionError::Abort)?; db.insert(tree_key.clone(), new_bytes)?; - Ok((old_entry, Some(new_entry))) + Ok((old_entry, new_entry)) })?; - if old_entry != new_entry { - self.instance.updated(old_entry, new_entry).await; + if old_entry.as_ref() != Some(&new_entry) { + if self.replication.epidemic_writes() { + epidemic_propagate.push(new_entry.clone()); + } + + self.instance.updated(old_entry, Some(new_entry)).await; let syncer = self.syncer.load_full().unwrap(); self.system.background.spawn(syncer.invalidate(tree_key)); } } + + if epidemic_propagate.len() > 0 { + let self2 = self.clone(); + self.system + .background + .spawn(async move { self2.insert_many(&epidemic_propagate[..]).await }); + } + Ok(()) } |