diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-14 15:40:46 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-14 15:40:46 +0100 |
commit | 90e1619b1e9f5d81e59da371f04717f0c4fe5afc (patch) | |
tree | 914435938c6134bb959baaa4ffbd4f575730e9c0 /src/table/table.rs | |
parent | 3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea (diff) | |
download | garage-90e1619b1e9f5d81e59da371f04717f0c4fe5afc.tar.gz garage-90e1619b1e9f5d81e59da371f04717f0c4fe5afc.zip |
table: take into account multiple write sets in inserts
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 17 |
1 files changed, 7 insertions, 10 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index bf08d5a0..c2efaeaf 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -119,17 +119,16 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> { async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - // TODO: use write sets - let who = self.data.replication.storage_nodes(&hash); + let who = self.data.replication.write_sets(&hash); let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::<F>::Update(vec![e_enc]); self.system .rpc_helper() - .try_call_many( + .try_write_many_sets( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.write_quorum()), @@ -243,11 +242,10 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> { .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()) - .interrupt_after_quorum(true), + .with_quorum(self.data.replication.read_quorum()), ) .await?; @@ -339,11 +337,10 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> { .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()) - .interrupt_after_quorum(true), + .with_quorum(self.data.replication.read_quorum()), ) .await?; |