diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-16 16:41:45 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-16 16:41:45 +0100 |
commit | 3ecd14b9f6202ad3c5513c6ad7422bd408134002 (patch) | |
tree | 74d081ebc64dac8416affc7b4bacef54e4920c09 /src | |
parent | 22f38808e744ea5b30ad771fcb344a29579b56d4 (diff) | |
download | garage-3ecd14b9f6202ad3c5513c6ad7422bd408134002.tar.gz garage-3ecd14b9f6202ad3c5513c6ad7422bd408134002.zip |
table: implement write sets for insert_many
Diffstat (limited to 'src')
-rw-r--r-- | src/table/table.rs | 157 |
1 files changed, 127 insertions, 30 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 5ec9eb0a..7d1ff31c 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -143,7 +143,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> { self.data.queue_insert(tx, e) } - pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error> + pub async fn insert_many<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error> where I: IntoIterator<Item = IE> + Send + Sync, IE: Borrow<F::E> + Send + Sync, @@ -161,52 +161,149 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> { Ok(()) } - async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error> + async fn insert_many_internal<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error> where I: IntoIterator<Item = IE> + Send + Sync, IE: Borrow<F::E> + Send + Sync, { - let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); - + // The different items will have to be stored on possibly different nodes. + // We will here batch all items into a single request for each concerned + // node, with all of the entries it must store within that request. + // Each entry has to be saved to a specific list of "write sets", i.e. a set + // of node within wich a quorum must be achieved. In normal operation, there + // is a single write set which corresponds to the quorum in the current + // cluster layout, but when the layout is updated, multiple write sets might + // have to be handled at once. Here, since we are sending many entries, we + // will have to handle many write sets in all cases. The algorihtm is thus + // to send one request to each node with all the items it must save, + // and keep track of the OK responses within each write set: if for all sets + // a quorum of nodes has answered OK, then the insert has succeeded and + // consistency properties (read-after-write) are preserved. + + // Some code here might feel redundant with RpcHelper::try_write_many_sets, + // but I think deduplicating could lead to more spaghetti instead of + // improving the readability, so I'm leaving as is. + + let quorum = self.data.replication.write_quorum(); + + // Serialize all entries and compute the write sets for each of them. + // In the case of sharded table replication, this also takes an "ack lock" + // to the layout manager to avoid ack'ing newer versions which are not + // taken into account by writes in progress (the ack can happen later, once + // all writes that didn't take the new layout into account are finished). + // These locks are released when entries_vec is dropped, i.e. when this + // function returns. + let mut entries_vec = Vec::new(); for entry in entries.into_iter() { let entry = entry.borrow(); let hash = entry.partition_key().hash(); - // TODO: use write sets - let who = self.data.replication.storage_nodes(&hash); + let write_sets = self.data.replication.write_sets(&hash); let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); - for node in who { - call_list.entry(node).or_default().push(e_enc.clone()); + entries_vec.push((write_sets, e_enc)); + } + + // Compute a deduplicated list of all of the write sets, + // and compute an index from each node to the position of the sets in which + // it takes part, to optimize the detection of a quorum. + let mut write_sets = entries_vec + .iter() + .map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice())) + .flatten() + .collect::<Vec<&[Uuid]>>(); + write_sets.sort(); + write_sets.dedup(); + let mut write_set_index = HashMap::<&Uuid, Vec<usize>>::new(); + for (i, write_set) in write_sets.iter().enumerate() { + for node in write_set.iter() { + write_set_index.entry(node).or_default().push(i); } } - let call_futures = call_list.drain().map(|(node, entries)| async move { - let rpc = TableRpc::<F>::Update(entries); - - let resp = self - .system - .rpc_helper() - .call( - &self.endpoint, - node, - rpc, - RequestStrategy::with_priority(PRIO_NORMAL), - ) - .await?; - Ok::<_, Error>((node, resp)) + // Build a map of all nodes to the entries that must be sent to that node. + let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new(); + for (write_sets, entry_enc) in entries_vec.iter() { + for write_set in write_sets.as_ref().iter() { + for node in write_set.iter() { + call_list.entry(*node).or_default().push(entry_enc.clone()) + } + } + } + + // Build futures to actually perform each of the corresponding RPC calls + let call_count = call_list.len(); + let call_futures = call_list.into_iter().map(|(node, entries)| { + let this = self.clone(); + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer.start(format!("RPC to {:?}", node)); + let fut = async move { + let rpc = TableRpc::<F>::Update(entries); + let resp = this + .system + .rpc_helper() + .call( + &this.endpoint, + node, + rpc, + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(quorum), + ) + .await; + (node, resp) + }; + fut.with_context(Context::current_with_span(span)) }); + + // Run all requests in parallel thanks to FuturesUnordered, and collect results. let mut resps = call_futures.collect::<FuturesUnordered<_>>(); + let mut set_counters = vec![(0, 0); write_sets.len()]; + let mut successes = 0; let mut errors = vec![]; - while let Some(resp) = resps.next().await { - if let Err(e) = resp { - errors.push(e); + while let Some((node, resp)) = resps.next().await { + match resp { + Ok(_) => { + successes += 1; + for set in write_set_index.get(&node).unwrap().iter() { + set_counters[*set].0 += 1; + } + } + Err(e) => { + errors.push(e); + for set in write_set_index.get(&node).unwrap().iter() { + set_counters[*set].1 += 1; + } + } + } + + if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) { + // Success + + // Continue all other requests in background + tokio::spawn(async move { + resps.collect::<Vec<(Uuid, Result<_, _>)>>().await; + }); + + return Ok(()); + } + + if set_counters + .iter() + .enumerate() + .any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len()) + { + // Too many errors in this set, we know we won't get a quorum + break; } } - if errors.len() > self.data.replication.max_write_errors() { - Err(Error::Message("Too many errors".into())) - } else { - Ok(()) - } + + // Failure, could not get quorum within at least one set + let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>(); + Err(Error::Quorum( + quorum, + Some(write_sets.len()), + successes, + call_count, + errors, + )) } pub async fn get( |