aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-19 15:14:23 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-19 15:14:23 +0200
commit302502f4c10b4c1cd03d3b098b3e55a3f70054f2 (patch)
tree3e32c8751dc5d62b1723bcc2738aa77f12d45123 /src/table.rs
parent7131553c53d4414d2da0e9b60e6e3425f1b46ec2 (diff)
downloadgarage-302502f4c10b4c1cd03d3b098b3e55a3f70054f2.tar.gz
garage-302502f4c10b4c1cd03d3b098b3e55a3f70054f2.zip
Add support for fully replicated tables with epidemic dissemination of updates
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs20
1 files changed, 17 insertions, 3 deletions
diff --git a/src/table.rs b/src/table.rs
index d5357277..619c96d2 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -427,6 +427,8 @@ where
self: &Arc<Self>,
mut entries: Vec<Arc<ByteBuf>>,
) -> Result<(), Error> {
+ let mut epidemic_propagate = vec![];
+
for update_bytes in entries.drain(..) {
let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
@@ -449,16 +451,28 @@ where
.map_err(Error::RMPEncode)
.map_err(sled::ConflictableTransactionError::Abort)?;
db.insert(tree_key.clone(), new_bytes)?;
- Ok((old_entry, Some(new_entry)))
+ Ok((old_entry, new_entry))
})?;
- if old_entry != new_entry {
- self.instance.updated(old_entry, new_entry).await;
+ if old_entry.as_ref() != Some(&new_entry) {
+ if self.replication.epidemic_writes() {
+ epidemic_propagate.push(new_entry.clone());
+ }
+
+ self.instance.updated(old_entry, Some(new_entry)).await;
let syncer = self.syncer.load_full().unwrap();
self.system.background.spawn(syncer.invalidate(tree_key));
}
}
+
+ if epidemic_propagate.len() > 0 {
+ let self2 = self.clone();
+ self.system
+ .background
+ .spawn(async move { self2.insert_many(&epidemic_propagate[..]).await });
+ }
+
Ok(())
}