diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-19 15:14:23 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-19 15:14:23 +0200 |
commit | 302502f4c10b4c1cd03d3b098b3e55a3f70054f2 (patch) | |
tree | 3e32c8751dc5d62b1723bcc2738aa77f12d45123 /src/table.rs | |
parent | 7131553c53d4414d2da0e9b60e6e3425f1b46ec2 (diff) | |
download | garage-302502f4c10b4c1cd03d3b098b3e55a3f70054f2.tar.gz garage-302502f4c10b4c1cd03d3b098b3e55a3f70054f2.zip |
Add support for fully replicated tables with epidemic dissemination of updates
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 20 |
1 files changed, 17 insertions, 3 deletions
diff --git a/src/table.rs b/src/table.rs index d5357277..619c96d2 100644 --- a/src/table.rs +++ b/src/table.rs @@ -427,6 +427,8 @@ where self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>, ) -> Result<(), Error> { + let mut epidemic_propagate = vec![]; + for update_bytes in entries.drain(..) { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; @@ -449,16 +451,28 @@ where .map_err(Error::RMPEncode) .map_err(sled::ConflictableTransactionError::Abort)?; db.insert(tree_key.clone(), new_bytes)?; - Ok((old_entry, Some(new_entry))) + Ok((old_entry, new_entry)) })?; - if old_entry != new_entry { - self.instance.updated(old_entry, new_entry).await; + if old_entry.as_ref() != Some(&new_entry) { + if self.replication.epidemic_writes() { + epidemic_propagate.push(new_entry.clone()); + } + + self.instance.updated(old_entry, Some(new_entry)).await; let syncer = self.syncer.load_full().unwrap(); self.system.background.spawn(syncer.invalidate(tree_key)); } } + + if epidemic_propagate.len() > 0 { + let self2 = self.clone(); + self.system + .background + .spawn(async move { self2.insert_many(&epidemic_propagate[..]).await }); + } + Ok(()) } |