aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs17
1 files changed, 2 insertions, 15 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 8b16173e..1f6b7d25 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -61,9 +61,8 @@ pub trait TableReplication: Send + Sync {
// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn write_quorum(&self) -> usize;
+ fn write_quorum(&self, system: &System) -> usize;
fn max_write_errors(&self) -> usize;
- fn epidemic_writes(&self) -> bool;
// Which are the nodes that do actually replicate the data
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
@@ -119,7 +118,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.write_quorum())
+ RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -382,7 +381,6 @@ where
pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
let syncer = self.syncer.load_full().unwrap();
- let mut epidemic_propagate = vec![];
for update_bytes in entries.iter() {
let update = self.decode_entry(update_bytes.as_slice())?;
@@ -410,22 +408,11 @@ where
})?;
if old_entry.as_ref() != Some(&new_entry) {
- if self.replication.epidemic_writes() {
- epidemic_propagate.push(new_entry.clone());
- }
-
self.instance.updated(old_entry, Some(new_entry));
syncer.invalidate(&tree_key[..]);
}
}
- if epidemic_propagate.len() > 0 {
- let self2 = self.clone();
- self.system
- .background
- .spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await });
- }
-
Ok(())
}