From 8a2b1dd422fb57abe611d8c1cf3cb0b55f487189 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 12:55:36 +0100 Subject: wip: split out layout management from System into separate LayoutManager --- src/table/table.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 7ad79677..3e3fd138 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -123,7 +123,7 @@ impl Table { let rpc = TableRpc::::Update(vec![e_enc]); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -181,7 +181,7 @@ impl Table { let resp = self .system - .rpc + .rpc_helper() .call( &self.endpoint, node, @@ -236,7 +236,7 @@ impl Table { let rpc = TableRpc::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self .system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -332,7 +332,7 @@ impl Table { let resps = self .system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -411,7 +411,7 @@ impl Table { async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(what.encode()?)); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, who, -- cgit v1.2.3 From df36cf3099f6010c4fc62109b85d4d1e62f160cc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 16:32:31 +0100 Subject: layout: add helpers to LayoutHistory and prepare integration with Table --- src/table/table.rs | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 3e3fd138..997fd7dc 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -80,6 +80,8 @@ impl Table { let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone()); let gc = TableGc::new(system.clone(), data.clone()); + system.layout_manager.add_table(F::TABLE_NAME); + let table = Arc::new(Self { system, data, -- cgit v1.2.3 From 3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 14:28:16 +0100 Subject: layout: prepare for write sets --- src/table/table.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 997fd7dc..bf08d5a0 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -119,7 +119,8 @@ impl Table { async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.data.replication.write_nodes(&hash); + // TODO: use write sets + let who = self.data.replication.storage_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::::Update(vec![e_enc]); @@ -171,7 +172,8 @@ impl Table { for entry in entries.into_iter() { let entry = entry.borrow(); let hash = entry.partition_key().hash(); - let who = self.data.replication.write_nodes(&hash); + // TODO: use write sets + let who = self.data.replication.storage_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); for node in who { call_list.entry(node).or_default().push(e_enc.clone()); -- cgit v1.2.3 From 90e1619b1e9f5d81e59da371f04717f0c4fe5afc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 15:40:46 +0100 Subject: table: take into account multiple write sets in inserts --- src/table/table.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index bf08d5a0..c2efaeaf 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -119,17 +119,16 @@ impl Table { async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - // TODO: use write sets - let who = self.data.replication.storage_nodes(&hash); + let who = self.data.replication.write_sets(&hash); let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::::Update(vec![e_enc]); self.system .rpc_helper() - .try_call_many( + .try_write_many_sets( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.write_quorum()), @@ -243,11 +242,10 @@ impl Table { .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()) - .interrupt_after_quorum(true), + .with_quorum(self.data.replication.read_quorum()), ) .await?; @@ -339,11 +337,10 @@ impl Table { .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()) - .interrupt_after_quorum(true), + .with_quorum(self.data.replication.read_quorum()), ) .await?; -- cgit v1.2.3 From 33c8a489b0a9c0e869282bfc19c548f5a3e02e8c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 15:40:44 +0100 Subject: layou: implement ack locking --- src/table/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index c2efaeaf..5ec9eb0a 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -128,7 +128,7 @@ impl Table { .rpc_helper() .try_write_many_sets( &self.endpoint, - &who, + who.as_ref(), rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.write_quorum()), -- cgit v1.2.3 From 3ecd14b9f6202ad3c5513c6ad7422bd408134002 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 16 Nov 2023 16:41:45 +0100 Subject: table: implement write sets for insert_many --- src/table/table.rs | 157 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 127 insertions(+), 30 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 5ec9eb0a..7d1ff31c 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -143,7 +143,7 @@ impl Table { self.data.queue_insert(tx, e) } - pub async fn insert_many(&self, entries: I) -> Result<(), Error> + pub async fn insert_many(self: &Arc, entries: I) -> Result<(), Error> where I: IntoIterator + Send + Sync, IE: Borrow + Send + Sync, @@ -161,52 +161,149 @@ impl Table { Ok(()) } - async fn insert_many_internal(&self, entries: I) -> Result<(), Error> + async fn insert_many_internal(self: &Arc, entries: I) -> Result<(), Error> where I: IntoIterator + Send + Sync, IE: Borrow + Send + Sync, { - let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); - + // The different items will have to be stored on possibly different nodes. + // We will here batch all items into a single request for each concerned + // node, with all of the entries it must store within that request. + // Each entry has to be saved to a specific list of "write sets", i.e. a set + // of node within wich a quorum must be achieved. In normal operation, there + // is a single write set which corresponds to the quorum in the current + // cluster layout, but when the layout is updated, multiple write sets might + // have to be handled at once. Here, since we are sending many entries, we + // will have to handle many write sets in all cases. The algorihtm is thus + // to send one request to each node with all the items it must save, + // and keep track of the OK responses within each write set: if for all sets + // a quorum of nodes has answered OK, then the insert has succeeded and + // consistency properties (read-after-write) are preserved. + + // Some code here might feel redundant with RpcHelper::try_write_many_sets, + // but I think deduplicating could lead to more spaghetti instead of + // improving the readability, so I'm leaving as is. + + let quorum = self.data.replication.write_quorum(); + + // Serialize all entries and compute the write sets for each of them. + // In the case of sharded table replication, this also takes an "ack lock" + // to the layout manager to avoid ack'ing newer versions which are not + // taken into account by writes in progress (the ack can happen later, once + // all writes that didn't take the new layout into account are finished). + // These locks are released when entries_vec is dropped, i.e. when this + // function returns. + let mut entries_vec = Vec::new(); for entry in entries.into_iter() { let entry = entry.borrow(); let hash = entry.partition_key().hash(); - // TODO: use write sets - let who = self.data.replication.storage_nodes(&hash); + let write_sets = self.data.replication.write_sets(&hash); let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); - for node in who { - call_list.entry(node).or_default().push(e_enc.clone()); + entries_vec.push((write_sets, e_enc)); + } + + // Compute a deduplicated list of all of the write sets, + // and compute an index from each node to the position of the sets in which + // it takes part, to optimize the detection of a quorum. + let mut write_sets = entries_vec + .iter() + .map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice())) + .flatten() + .collect::>(); + write_sets.sort(); + write_sets.dedup(); + let mut write_set_index = HashMap::<&Uuid, Vec>::new(); + for (i, write_set) in write_sets.iter().enumerate() { + for node in write_set.iter() { + write_set_index.entry(node).or_default().push(i); } } - let call_futures = call_list.drain().map(|(node, entries)| async move { - let rpc = TableRpc::::Update(entries); - - let resp = self - .system - .rpc_helper() - .call( - &self.endpoint, - node, - rpc, - RequestStrategy::with_priority(PRIO_NORMAL), - ) - .await?; - Ok::<_, Error>((node, resp)) + // Build a map of all nodes to the entries that must be sent to that node. + let mut call_list: HashMap> = HashMap::new(); + for (write_sets, entry_enc) in entries_vec.iter() { + for write_set in write_sets.as_ref().iter() { + for node in write_set.iter() { + call_list.entry(*node).or_default().push(entry_enc.clone()) + } + } + } + + // Build futures to actually perform each of the corresponding RPC calls + let call_count = call_list.len(); + let call_futures = call_list.into_iter().map(|(node, entries)| { + let this = self.clone(); + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer.start(format!("RPC to {:?}", node)); + let fut = async move { + let rpc = TableRpc::::Update(entries); + let resp = this + .system + .rpc_helper() + .call( + &this.endpoint, + node, + rpc, + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(quorum), + ) + .await; + (node, resp) + }; + fut.with_context(Context::current_with_span(span)) }); + + // Run all requests in parallel thanks to FuturesUnordered, and collect results. let mut resps = call_futures.collect::>(); + let mut set_counters = vec![(0, 0); write_sets.len()]; + let mut successes = 0; let mut errors = vec![]; - while let Some(resp) = resps.next().await { - if let Err(e) = resp { - errors.push(e); + while let Some((node, resp)) = resps.next().await { + match resp { + Ok(_) => { + successes += 1; + for set in write_set_index.get(&node).unwrap().iter() { + set_counters[*set].0 += 1; + } + } + Err(e) => { + errors.push(e); + for set in write_set_index.get(&node).unwrap().iter() { + set_counters[*set].1 += 1; + } + } + } + + if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) { + // Success + + // Continue all other requests in background + tokio::spawn(async move { + resps.collect::)>>().await; + }); + + return Ok(()); + } + + if set_counters + .iter() + .enumerate() + .any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len()) + { + // Too many errors in this set, we know we won't get a quorum + break; } } - if errors.len() > self.data.replication.max_write_errors() { - Err(Error::Message("Too many errors".into())) - } else { - Ok(()) - } + + // Failure, could not get quorum within at least one set + let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); + Err(Error::Quorum( + quorum, + Some(write_sets.len()), + successes, + call_count, + errors, + )) } pub async fn get( -- cgit v1.2.3 From 95eb13eb08d517d328e3c8aeb222440a27211ee9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Dec 2023 10:55:15 +0100 Subject: rpc: refactor result tracking for quorum sets --- src/table/table.rs | 54 +++++++++++------------------------------------------- 1 file changed, 11 insertions(+), 43 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 7d1ff31c..6508cf5d 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -20,6 +20,7 @@ use garage_util::error::Error; use garage_util::metrics::RecordDuration; use garage_util::migrate::Migrate; +use garage_rpc::rpc_helper::QuorumSetResultTracker; use garage_rpc::system::System; use garage_rpc::*; @@ -180,10 +181,6 @@ impl Table { // a quorum of nodes has answered OK, then the insert has succeeded and // consistency properties (read-after-write) are preserved. - // Some code here might feel redundant with RpcHelper::try_write_many_sets, - // but I think deduplicating could lead to more spaghetti instead of - // improving the readability, so I'm leaving as is. - let quorum = self.data.replication.write_quorum(); // Serialize all entries and compute the write sets for each of them. @@ -197,7 +194,10 @@ impl Table { for entry in entries.into_iter() { let entry = entry.borrow(); let hash = entry.partition_key().hash(); - let write_sets = self.data.replication.write_sets(&hash); + let mut write_sets = self.data.replication.write_sets(&hash); + for set in write_sets.as_mut().iter_mut() { + set.sort(); + } let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); entries_vec.push((write_sets, e_enc)); } @@ -212,12 +212,8 @@ impl Table { .collect::>(); write_sets.sort(); write_sets.dedup(); - let mut write_set_index = HashMap::<&Uuid, Vec>::new(); - for (i, write_set) in write_sets.iter().enumerate() { - for node in write_set.iter() { - write_set_index.entry(node).or_default().push(i); - } - } + + let mut result_tracker = QuorumSetResultTracker::new(&write_sets, quorum); // Build a map of all nodes to the entries that must be sent to that node. let mut call_list: HashMap> = HashMap::new(); @@ -230,7 +226,6 @@ impl Table { } // Build futures to actually perform each of the corresponding RPC calls - let call_count = call_list.len(); let call_futures = call_list.into_iter().map(|(node, entries)| { let this = self.clone(); let tracer = opentelemetry::global::tracer("garage"); @@ -254,27 +249,11 @@ impl Table { // Run all requests in parallel thanks to FuturesUnordered, and collect results. let mut resps = call_futures.collect::>(); - let mut set_counters = vec![(0, 0); write_sets.len()]; - let mut successes = 0; - let mut errors = vec![]; while let Some((node, resp)) = resps.next().await { - match resp { - Ok(_) => { - successes += 1; - for set in write_set_index.get(&node).unwrap().iter() { - set_counters[*set].0 += 1; - } - } - Err(e) => { - errors.push(e); - for set in write_set_index.get(&node).unwrap().iter() { - set_counters[*set].1 += 1; - } - } - } + result_tracker.register_result(node, resp.map(|_| ())); - if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) { + if result_tracker.all_quorums_ok() { // Success // Continue all other requests in background @@ -285,25 +264,14 @@ impl Table { return Ok(()); } - if set_counters - .iter() - .enumerate() - .any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len()) - { + if result_tracker.too_many_failures() { // Too many errors in this set, we know we won't get a quorum break; } } // Failure, could not get quorum within at least one set - let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); - Err(Error::Quorum( - quorum, - Some(write_sets.len()), - successes, - call_count, - errors, - )) + Err(result_tracker.quorum_error()) } pub async fn get( -- cgit v1.2.3 From f8df90b79b93e4a1391839435718bad8c697246d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 8 Dec 2023 14:54:11 +0100 Subject: table: fix insert_many to not send duplicates --- src/table/table.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 6508cf5d..59cfdd07 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -196,6 +196,8 @@ impl Table { let hash = entry.partition_key().hash(); let mut write_sets = self.data.replication.write_sets(&hash); for set in write_sets.as_mut().iter_mut() { + // Sort nodes in each write sets to merge write sets with same + // nodes but in possibly different orders set.sort(); } let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); @@ -220,7 +222,16 @@ impl Table { for (write_sets, entry_enc) in entries_vec.iter() { for write_set in write_sets.as_ref().iter() { for node in write_set.iter() { - call_list.entry(*node).or_default().push(entry_enc.clone()) + let node_entries = call_list.entry(*node).or_default(); + match node_entries.last() { + Some(x) if Arc::ptr_eq(x, entry_enc) => { + // skip if entry already in list to send to this node + // (could happen if node is in several write sets for this entry) + } + _ => { + node_entries.push(entry_enc.clone()); + } + } } } } -- cgit v1.2.3 From e4f493b48156e6e30f16fba10f300f6cb5fe0b0d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Dec 2023 14:57:42 +0100 Subject: table: remove redundant tracing in insert_many --- src/table/table.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 59cfdd07..05a0dab1 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -239,9 +239,7 @@ impl Table { // Build futures to actually perform each of the corresponding RPC calls let call_futures = call_list.into_iter().map(|(node, entries)| { let this = self.clone(); - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer.start(format!("RPC to {:?}", node)); - let fut = async move { + async move { let rpc = TableRpc::::Update(entries); let resp = this .system @@ -254,8 +252,7 @@ impl Table { ) .await; (node, resp) - }; - fut.with_context(Context::current_with_span(span)) + } }); // Run all requests in parallel thanks to FuturesUnordered, and collect results. -- cgit v1.2.3 From 85b5a6bcd11c0a7651e4c589569e1935a3d18e46 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Dec 2023 15:31:47 +0100 Subject: fix some clippy lints --- src/table/table.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 05a0dab1..a5be2910 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -209,8 +209,7 @@ impl Table { // it takes part, to optimize the detection of a quorum. let mut write_sets = entries_vec .iter() - .map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice())) - .flatten() + .flat_map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice())) .collect::>(); write_sets.sort(); write_sets.dedup(); -- cgit v1.2.3