aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-01-11 10:58:08 +0000
committerAlex <alex@adnab.me>2024-01-11 10:58:08 +0000
commit8a6ec1d6111a60e602c90ade2200b2dab5733fe3 (patch)
treeb8daac4f41050339c87106d72ce7224f7eef38aa /src/table/table.rs
parent723e56b37f13f078a15e067343191fb1bf96e8b2 (diff)
parent0041b013a473e3ae72f50209d8f79db75a72848b (diff)
downloadgarage-8a6ec1d6111a60e602c90ade2200b2dab5733fe3.tar.gz
garage-8a6ec1d6111a60e602c90ade2200b2dab5733fe3.zip
Merge pull request 'NLnet task 3' (#667) from nlnet-task3 into next-0.10
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/667
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs159
1 files changed, 116 insertions, 43 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 7ad79677..a5be2910 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -20,6 +20,7 @@ use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use garage_util::migrate::Migrate;
+use garage_rpc::rpc_helper::QuorumSetResultTracker;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -80,6 +81,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
let gc = TableGc::new(system.clone(), data.clone());
+ system.layout_manager.add_table(F::TABLE_NAME);
+
let table = Arc::new(Self {
system,
data,
@@ -117,16 +120,16 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.data.replication.write_nodes(&hash);
+ let who = self.data.replication.write_sets(&hash);
let e_enc = Arc::new(ByteBuf::from(e.encode()?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.system
- .rpc
- .try_call_many(
+ .rpc_helper()
+ .try_write_many_sets(
&self.endpoint,
- &who[..],
+ who.as_ref(),
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.write_quorum()),
@@ -141,7 +144,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
self.data.queue_insert(tx, e)
}
- pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
+ pub async fn insert_many<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync,
@@ -159,51 +162,123 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
Ok(())
}
- async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error>
+ async fn insert_many_internal<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync,
{
- let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
-
+ // The different items will have to be stored on possibly different nodes.
+ // We will here batch all items into a single request for each concerned
+ // node, with all of the entries it must store within that request.
+ // Each entry has to be saved to a specific list of "write sets", i.e. a set
+ // of node within wich a quorum must be achieved. In normal operation, there
+ // is a single write set which corresponds to the quorum in the current
+ // cluster layout, but when the layout is updated, multiple write sets might
+ // have to be handled at once. Here, since we are sending many entries, we
+ // will have to handle many write sets in all cases. The algorihtm is thus
+ // to send one request to each node with all the items it must save,
+ // and keep track of the OK responses within each write set: if for all sets
+ // a quorum of nodes has answered OK, then the insert has succeeded and
+ // consistency properties (read-after-write) are preserved.
+
+ let quorum = self.data.replication.write_quorum();
+
+ // Serialize all entries and compute the write sets for each of them.
+ // In the case of sharded table replication, this also takes an "ack lock"
+ // to the layout manager to avoid ack'ing newer versions which are not
+ // taken into account by writes in progress (the ack can happen later, once
+ // all writes that didn't take the new layout into account are finished).
+ // These locks are released when entries_vec is dropped, i.e. when this
+ // function returns.
+ let mut entries_vec = Vec::new();
for entry in entries.into_iter() {
let entry = entry.borrow();
let hash = entry.partition_key().hash();
- let who = self.data.replication.write_nodes(&hash);
+ let mut write_sets = self.data.replication.write_sets(&hash);
+ for set in write_sets.as_mut().iter_mut() {
+ // Sort nodes in each write sets to merge write sets with same
+ // nodes but in possibly different orders
+ set.sort();
+ }
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
- for node in who {
- call_list.entry(node).or_default().push(e_enc.clone());
+ entries_vec.push((write_sets, e_enc));
+ }
+
+ // Compute a deduplicated list of all of the write sets,
+ // and compute an index from each node to the position of the sets in which
+ // it takes part, to optimize the detection of a quorum.
+ let mut write_sets = entries_vec
+ .iter()
+ .flat_map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice()))
+ .collect::<Vec<&[Uuid]>>();
+ write_sets.sort();
+ write_sets.dedup();
+
+ let mut result_tracker = QuorumSetResultTracker::new(&write_sets, quorum);
+
+ // Build a map of all nodes to the entries that must be sent to that node.
+ let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new();
+ for (write_sets, entry_enc) in entries_vec.iter() {
+ for write_set in write_sets.as_ref().iter() {
+ for node in write_set.iter() {
+ let node_entries = call_list.entry(*node).or_default();
+ match node_entries.last() {
+ Some(x) if Arc::ptr_eq(x, entry_enc) => {
+ // skip if entry already in list to send to this node
+ // (could happen if node is in several write sets for this entry)
+ }
+ _ => {
+ node_entries.push(entry_enc.clone());
+ }
+ }
+ }
}
}
- let call_futures = call_list.drain().map(|(node, entries)| async move {
- let rpc = TableRpc::<F>::Update(entries);
-
- let resp = self
- .system
- .rpc
- .call(
- &self.endpoint,
- node,
- rpc,
- RequestStrategy::with_priority(PRIO_NORMAL),
- )
- .await?;
- Ok::<_, Error>((node, resp))
+ // Build futures to actually perform each of the corresponding RPC calls
+ let call_futures = call_list.into_iter().map(|(node, entries)| {
+ let this = self.clone();
+ async move {
+ let rpc = TableRpc::<F>::Update(entries);
+ let resp = this
+ .system
+ .rpc_helper()
+ .call(
+ &this.endpoint,
+ node,
+ rpc,
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(quorum),
+ )
+ .await;
+ (node, resp)
+ }
});
+
+ // Run all requests in parallel thanks to FuturesUnordered, and collect results.
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
- let mut errors = vec![];
- while let Some(resp) = resps.next().await {
- if let Err(e) = resp {
- errors.push(e);
+ while let Some((node, resp)) = resps.next().await {
+ result_tracker.register_result(node, resp.map(|_| ()));
+
+ if result_tracker.all_quorums_ok() {
+ // Success
+
+ // Continue all other requests in background
+ tokio::spawn(async move {
+ resps.collect::<Vec<(Uuid, Result<_, _>)>>().await;
+ });
+
+ return Ok(());
+ }
+
+ if result_tracker.too_many_failures() {
+ // Too many errors in this set, we know we won't get a quorum
+ break;
}
}
- if errors.len() > self.data.replication.max_write_errors() {
- Err(Error::Message("Too many errors".into()))
- } else {
- Ok(())
- }
+
+ // Failure, could not get quorum within at least one set
+ Err(result_tracker.quorum_error())
}
pub async fn get(
@@ -236,14 +311,13 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &who[..],
+ &who,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.data.replication.read_quorum())
- .interrupt_after_quorum(true),
+ .with_quorum(self.data.replication.read_quorum()),
)
.await?;
@@ -332,14 +406,13 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let resps = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &who[..],
+ &who,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.data.replication.read_quorum())
- .interrupt_after_quorum(true),
+ .with_quorum(self.data.replication.read_quorum()),
)
.await?;
@@ -411,7 +484,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(what.encode()?));
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
who,