From 12d1dbfc6b884be488e2d79c0b9e3c47490f5442 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 15:41:24 +0100 Subject: remove Ring and use ClusterLayout everywhere --- src/table/merkle.rs | 2 +- src/table/replication/fullcopy.rs | 8 ++++---- src/table/replication/parameters.rs | 2 +- src/table/replication/sharded.rs | 14 +++++++------- src/table/sync.rs | 20 ++++++++++---------- 5 files changed, 23 insertions(+), 23 deletions(-) (limited to 'src/table') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 4577f872..01271c58 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -13,7 +13,7 @@ use garage_util::data::*; use garage_util::encode::{nonversioned_decode, nonversioned_encode}; use garage_util::error::Error; -use garage_rpc::ring::*; +use garage_rpc::layout::*; use crate::data::*; use crate::replication::*; diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 18682ace..f8b7cacc 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use garage_rpc::ring::*; +use garage_rpc::layout::*; use garage_rpc::system::System; use garage_util::data::*; @@ -27,11 +27,11 @@ impl TableReplication for TableFullReplication { } fn write_nodes(&self, _hash: &Hash) -> Vec { - let ring = self.system.ring.borrow(); - ring.layout.node_ids().to_vec() + let layout = self.system.layout_watch.borrow(); + layout.node_ids().to_vec() } fn write_quorum(&self) -> usize { - let nmembers = self.system.ring.borrow().layout.node_ids().len(); + let nmembers = self.system.layout_watch.borrow().node_ids().len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index f00815a2..19b306f2 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -1,4 +1,4 @@ -use garage_rpc::ring::*; +use garage_rpc::layout::*; use garage_util::data::*; /// Trait to describe how a table shall be replicated diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 1cf964af..95901a5a 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use garage_rpc::ring::*; +use garage_rpc::layout::*; use garage_rpc::system::System; use garage_util::data::*; @@ -26,16 +26,16 @@ pub struct TableShardedReplication { impl TableReplication for TableShardedReplication { fn read_nodes(&self, hash: &Hash) -> Vec { - let ring = self.system.ring.borrow(); - ring.get_nodes(hash, self.replication_factor) + let layout = self.system.layout_watch.borrow(); + layout.nodes_of(hash, self.replication_factor) } fn read_quorum(&self) -> usize { self.read_quorum } fn write_nodes(&self, hash: &Hash) -> Vec { - let ring = self.system.ring.borrow(); - ring.get_nodes(hash, self.replication_factor) + let layout = self.system.layout_watch.borrow(); + layout.nodes_of(hash, self.replication_factor) } fn write_quorum(&self) -> usize { self.write_quorum @@ -45,9 +45,9 @@ impl TableReplication for TableShardedReplication { } fn partition_of(&self, hash: &Hash) -> Partition { - self.system.ring.borrow().partition_of(hash) + self.system.layout_watch.borrow().partition_of(hash) } fn partitions(&self) -> Vec<(Partition, Hash)> { - self.system.ring.borrow().partitions() + self.system.layout_watch.borrow().partitions() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 92a353c6..b2600013 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -17,7 +17,7 @@ use garage_util::data::*; use garage_util::encode::{debug_serialize, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; -use garage_rpc::ring::*; +use garage_rpc::layout::*; use garage_rpc::system::System; use garage_rpc::*; @@ -91,8 +91,8 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), - ring_recv: self.system.ring.clone(), - ring: self.system.ring.borrow().clone(), + layout_watch: self.system.layout_watch.clone(), + layout: self.system.layout_watch.borrow().clone(), add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), @@ -492,8 +492,8 @@ impl EndpointHandler for TableSync struct SyncWorker { syncer: Arc>, - ring_recv: watch::Receiver>, - ring: Arc, + layout_watch: watch::Receiver>, + layout: Arc, add_full_sync_rx: mpsc::UnboundedReceiver<()>, todo: Vec, next_full_sync: Instant, @@ -593,11 +593,11 @@ impl Worker for SyncWorker { self.add_full_sync(); } }, - _ = self.ring_recv.changed() => { - let new_ring = self.ring_recv.borrow(); - if !Arc::ptr_eq(&new_ring, &self.ring) { - self.ring = new_ring.clone(); - drop(new_ring); + _ = self.layout_watch.changed() => { + let new_layout = self.layout_watch.borrow(); + if !Arc::ptr_eq(&new_layout, &self.layout) { + self.layout = new_layout.clone(); + drop(new_layout); debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); self.add_full_sync(); } -- cgit v1.2.3 From 4a9c94514f49aa4e9880a8e0f5cf5a52d11ae993 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 16:41:00 +0100 Subject: avoid using layout_watch in System directly --- src/table/replication/fullcopy.rs | 4 ++-- src/table/replication/sharded.rs | 8 ++++---- src/table/sync.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) (limited to 'src/table') diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index f8b7cacc..34807e3d 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -27,11 +27,11 @@ impl TableReplication for TableFullReplication { } fn write_nodes(&self, _hash: &Hash) -> Vec { - let layout = self.system.layout_watch.borrow(); + let layout = self.system.cluster_layout(); layout.node_ids().to_vec() } fn write_quorum(&self) -> usize { - let nmembers = self.system.layout_watch.borrow().node_ids().len(); + let nmembers = self.system.cluster_layout().node_ids().len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 95901a5a..60c95cb4 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -26,7 +26,7 @@ pub struct TableShardedReplication { impl TableReplication for TableShardedReplication { fn read_nodes(&self, hash: &Hash) -> Vec { - let layout = self.system.layout_watch.borrow(); + let layout = self.system.cluster_layout(); layout.nodes_of(hash, self.replication_factor) } fn read_quorum(&self) -> usize { @@ -34,7 +34,7 @@ impl TableReplication for TableShardedReplication { } fn write_nodes(&self, hash: &Hash) -> Vec { - let layout = self.system.layout_watch.borrow(); + let layout = self.system.cluster_layout(); layout.nodes_of(hash, self.replication_factor) } fn write_quorum(&self) -> usize { @@ -45,9 +45,9 @@ impl TableReplication for TableShardedReplication { } fn partition_of(&self, hash: &Hash) -> Partition { - self.system.layout_watch.borrow().partition_of(hash) + self.system.cluster_layout().partition_of(hash) } fn partitions(&self) -> Vec<(Partition, Hash)> { - self.system.layout_watch.borrow().partitions() + self.system.cluster_layout().partitions() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index b2600013..65eff7cd 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -92,7 +92,7 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), layout_watch: self.system.layout_watch.clone(), - layout: self.system.layout_watch.borrow().clone(), + layout: self.system.cluster_layout().clone(), add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), -- cgit v1.2.3 From 8dccee3ccfe7793c42203f28c1e91c6f989b6899 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 19:28:36 +0100 Subject: cluster layout: adapt all uses of ClusterLayout to LayoutHistory --- src/table/replication/fullcopy.rs | 5 ++--- src/table/replication/sharded.rs | 16 ++++++++++------ src/table/sync.rs | 4 ++-- 3 files changed, 14 insertions(+), 11 deletions(-) (limited to 'src/table') diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 34807e3d..a5c83d0f 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -27,11 +27,10 @@ impl TableReplication for TableFullReplication { } fn write_nodes(&self, _hash: &Hash) -> Vec { - let layout = self.system.cluster_layout(); - layout.node_ids().to_vec() + self.system.cluster_layout().current().node_ids().to_vec() } fn write_quorum(&self) -> usize { - let nmembers = self.system.cluster_layout().node_ids().len(); + let nmembers = self.system.cluster_layout().current().node_ids().len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 60c95cb4..793d87fd 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -26,16 +26,20 @@ pub struct TableShardedReplication { impl TableReplication for TableShardedReplication { fn read_nodes(&self, hash: &Hash) -> Vec { - let layout = self.system.cluster_layout(); - layout.nodes_of(hash, self.replication_factor) + self.system + .cluster_layout() + .current() + .nodes_of(hash, self.replication_factor) } fn read_quorum(&self) -> usize { self.read_quorum } fn write_nodes(&self, hash: &Hash) -> Vec { - let layout = self.system.cluster_layout(); - layout.nodes_of(hash, self.replication_factor) + self.system + .cluster_layout() + .current() + .nodes_of(hash, self.replication_factor) } fn write_quorum(&self) -> usize { self.write_quorum @@ -45,9 +49,9 @@ impl TableReplication for TableShardedReplication { } fn partition_of(&self, hash: &Hash) -> Partition { - self.system.cluster_layout().partition_of(hash) + self.system.cluster_layout().current().partition_of(hash) } fn partitions(&self) -> Vec<(Partition, Hash)> { - self.system.cluster_layout().partitions() + self.system.cluster_layout().current().partitions() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 65eff7cd..620d83b9 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -492,8 +492,8 @@ impl EndpointHandler for TableSync struct SyncWorker { syncer: Arc>, - layout_watch: watch::Receiver>, - layout: Arc, + layout_watch: watch::Receiver>, + layout: Arc, add_full_sync_rx: mpsc::UnboundedReceiver<()>, todo: Vec, next_full_sync: Instant, -- cgit v1.2.3 From 8a2b1dd422fb57abe611d8c1cf3cb0b55f487189 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 12:55:36 +0100 Subject: wip: split out layout management from System into separate LayoutManager --- src/table/gc.rs | 4 ++-- src/table/sync.rs | 10 +++++----- src/table/table.rs | 10 +++++----- 3 files changed, 12 insertions(+), 12 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index 5b9124a7..2135a358 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -227,7 +227,7 @@ impl TableGc { // GC'ing is not a critical function of the system, so it's not a big // deal if we can't do it right now. self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &nodes[..], @@ -248,7 +248,7 @@ impl TableGc { // it means that the garbage collection wasn't completed and has // to be retried later. self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &nodes[..], diff --git a/src/table/sync.rs b/src/table/sync.rs index 620d83b9..2da1bfe7 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -91,7 +91,7 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), - layout_watch: self.system.layout_watch.clone(), + layout_watch: self.system.layout_watch(), layout: self.system.cluster_layout().clone(), add_full_sync_rx, todo: vec![], @@ -244,7 +244,7 @@ impl TableSyncer { } self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, nodes, @@ -305,7 +305,7 @@ impl TableSyncer { // If so, do nothing. let root_resp = self .system - .rpc + .rpc_helper() .call( &self.endpoint, who, @@ -361,7 +361,7 @@ impl TableSyncer { // and compare it with local node let remote_node = match self .system - .rpc + .rpc_helper() .call( &self.endpoint, who, @@ -437,7 +437,7 @@ impl TableSyncer { let rpc_resp = self .system - .rpc + .rpc_helper() .call( &self.endpoint, who, diff --git a/src/table/table.rs b/src/table/table.rs index 7ad79677..3e3fd138 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -123,7 +123,7 @@ impl Table { let rpc = TableRpc::::Update(vec![e_enc]); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -181,7 +181,7 @@ impl Table { let resp = self .system - .rpc + .rpc_helper() .call( &self.endpoint, node, @@ -236,7 +236,7 @@ impl Table { let rpc = TableRpc::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self .system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -332,7 +332,7 @@ impl Table { let resps = self .system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -411,7 +411,7 @@ impl Table { async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(what.encode()?)); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, who, -- cgit v1.2.3 From bfb1845fdc981a370539d641a5d80f438f184f07 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 14:12:05 +0100 Subject: layout: refactor to use a RwLock on LayoutHistory --- src/table/sync.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index 2da1bfe7..4355bd9e 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -10,7 +10,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use tokio::select; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, Notify}; use garage_util::background::*; use garage_util::data::*; @@ -91,8 +91,8 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), - layout_watch: self.system.layout_watch(), - layout: self.system.cluster_layout().clone(), + layout_notify: self.system.layout_notify(), + layout_version: self.system.cluster_layout().current().version, add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), @@ -492,8 +492,8 @@ impl EndpointHandler for TableSync struct SyncWorker { syncer: Arc>, - layout_watch: watch::Receiver>, - layout: Arc, + layout_notify: Arc, + layout_version: u64, add_full_sync_rx: mpsc::UnboundedReceiver<()>, todo: Vec, next_full_sync: Instant, @@ -593,12 +593,11 @@ impl Worker for SyncWorker { self.add_full_sync(); } }, - _ = self.layout_watch.changed() => { - let new_layout = self.layout_watch.borrow(); - if !Arc::ptr_eq(&new_layout, &self.layout) { - self.layout = new_layout.clone(); - drop(new_layout); - debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); + _ = self.layout_notify.notified() => { + let new_version = self.syncer.system.cluster_layout().current().version; + if new_version > self.layout_version { + self.layout_version = new_version; + debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME); self.add_full_sync(); } }, -- cgit v1.2.3 From df36cf3099f6010c4fc62109b85d4d1e62f160cc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 16:32:31 +0100 Subject: layout: add helpers to LayoutHistory and prepare integration with Table --- src/table/table.rs | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/table') diff --git a/src/table/table.rs b/src/table/table.rs index 3e3fd138..997fd7dc 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -80,6 +80,8 @@ impl Table { let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone()); let gc = TableGc::new(system.clone(), data.clone()); + system.layout_manager.add_table(F::TABLE_NAME); + let table = Arc::new(Self { system, data, -- cgit v1.2.3 From ce89d1ddabe3b9e638b0173949726522ae9a0311 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Nov 2023 12:08:32 +0100 Subject: table sync: adapt to new layout history --- src/table/replication/fullcopy.rs | 25 ++++- src/table/replication/parameters.rs | 19 +++- src/table/replication/sharded.rs | 39 +++++++- src/table/sync.rs | 178 +++++++++++++++--------------------- 4 files changed, 148 insertions(+), 113 deletions(-) (limited to 'src/table') diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index a5c83d0f..5653a229 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -1,3 +1,4 @@ +use std::iter::FromIterator; use std::sync::Arc; use garage_rpc::layout::*; @@ -6,10 +7,17 @@ use garage_util::data::*; use crate::replication::*; +// TODO: find a way to track layout changes for this as well +// The hard thing is that this data is stored also on gateway nodes, +// whereas sharded data is stored only on non-Gateway nodes (storage nodes) +// Also we want to be more tolerant to failures of gateways so we don't +// want to do too much holding back of data when progress of gateway +// nodes is not reported in the layout history's ack/sync/sync_ack maps. + /// Full replication schema: all nodes store everything -/// Writes are disseminated in an epidemic manner in the network /// Advantage: do all reads locally, extremely fast /// Inconvenient: only suitable to reasonably small tables +/// Inconvenient: if some writes fail, nodes will read outdated data #[derive(Clone)] pub struct TableFullReplication { /// The membership manager of this node @@ -44,7 +52,18 @@ impl TableReplication for TableFullReplication { fn partition_of(&self, _hash: &Hash) -> Partition { 0u16 } - fn partitions(&self) -> Vec<(Partition, Hash)> { - vec![(0u16, [0u8; 32].into())] + + fn sync_partitions(&self) -> SyncPartitions { + let layout = self.system.cluster_layout(); + let layout_version = layout.current().version; + SyncPartitions { + layout_version, + partitions: vec![SyncPartition { + partition: 0u16, + first_hash: [0u8; 32].into(), + last_hash: [0xff; 32].into(), + storage_nodes: Vec::from_iter(layout.current().node_ids().to_vec()), + }], + } } } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 19b306f2..2a7d3585 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -20,6 +20,21 @@ pub trait TableReplication: Send + Sync + 'static { // Accessing partitions, for Merkle tree & sync /// Get partition for data with given hash fn partition_of(&self, hash: &Hash) -> Partition; - /// List of existing partitions - fn partitions(&self) -> Vec<(Partition, Hash)>; + + /// List of partitions and nodes to sync with in current layout + fn sync_partitions(&self) -> SyncPartitions; +} + +#[derive(Debug)] +pub struct SyncPartitions { + pub layout_version: u64, + pub partitions: Vec, +} + +#[derive(Debug)] +pub struct SyncPartition { + pub partition: Partition, + pub first_hash: Hash, + pub last_hash: Hash, + pub storage_nodes: Vec, } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 793d87fd..f02b1d66 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -51,7 +51,42 @@ impl TableReplication for TableShardedReplication { fn partition_of(&self, hash: &Hash) -> Partition { self.system.cluster_layout().current().partition_of(hash) } - fn partitions(&self) -> Vec<(Partition, Hash)> { - self.system.cluster_layout().current().partitions() + + fn sync_partitions(&self) -> SyncPartitions { + let layout = self.system.cluster_layout(); + let layout_version = layout.all_ack(); + + let mut partitions = layout + .current() + .partitions() + .map(|(partition, first_hash)| { + let mut storage_nodes = layout + .write_sets_of(&first_hash) + .map(|x| x.into_iter()) + .flatten() + .collect::>(); + storage_nodes.sort(); + storage_nodes.dedup(); + SyncPartition { + partition, + first_hash, + last_hash: [0u8; 32].into(), // filled in just after + storage_nodes, + } + }) + .collect::>(); + + for i in 0..partitions.len() { + partitions[i].last_hash = if i + 1 < partitions.len() { + partitions[i + 1].first_hash + } else { + [0xFFu8; 32].into() + }; + } + + SyncPartitions { + layout_version, + partitions, + } } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 4355bd9e..43636faa 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -6,7 +6,7 @@ use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; use opentelemetry::KeyValue; -use rand::Rng; +use rand::prelude::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use tokio::select; @@ -52,16 +52,6 @@ impl Rpc for SyncRpc { type Response = Result; } -#[derive(Debug, Clone)] -struct TodoPartition { - partition: Partition, - begin: Hash, - end: Hash, - - // Are we a node that stores this partition or not? - retain: bool, -} - impl TableSyncer { pub(crate) fn new( system: Arc, @@ -92,9 +82,9 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), layout_notify: self.system.layout_notify(), - layout_version: self.system.cluster_layout().current().version, + layout_versions: self.system.cluster_layout().sync_versions(), add_full_sync_rx, - todo: vec![], + todo: None, next_full_sync: Instant::now() + Duration::from_secs(20), }); } @@ -112,31 +102,26 @@ impl TableSyncer { async fn sync_partition( self: &Arc, - partition: &TodoPartition, + partition: &SyncPartition, must_exit: &mut watch::Receiver, ) -> Result<(), Error> { - if partition.retain { - let my_id = self.system.id; - - let nodes = self - .data - .replication - .write_nodes(&partition.begin) - .into_iter() - .filter(|node| *node != my_id) - .collect::>(); + let my_id = self.system.id; + let retain = partition.storage_nodes.contains(&my_id); + if retain { debug!( "({}) Syncing {:?} with {:?}...", F::TABLE_NAME, partition, - nodes + partition.storage_nodes ); - let mut sync_futures = nodes + let mut sync_futures = partition + .storage_nodes .iter() + .filter(|node| **node != my_id) .map(|node| { self.clone() - .do_sync_with(partition.clone(), *node, must_exit.clone()) + .do_sync_with(&partition, *node, must_exit.clone()) }) .collect::>(); @@ -147,14 +132,14 @@ impl TableSyncer { warn!("({}) Sync error: {}", F::TABLE_NAME, e); } } - if n_errors > self.data.replication.max_write_errors() { + if n_errors > 0 { return Err(Error::Message(format!( - "Sync failed with too many nodes (should have been: {:?}).", - nodes + "Sync failed with {} nodes.", + n_errors ))); } } else { - self.offload_partition(&partition.begin, &partition.end, must_exit) + self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit) .await?; } @@ -285,7 +270,7 @@ impl TableSyncer { async fn do_sync_with( self: Arc, - partition: TodoPartition, + partition: &SyncPartition, who: Uuid, must_exit: watch::Receiver, ) -> Result<(), Error> { @@ -492,76 +477,23 @@ impl EndpointHandler for TableSync struct SyncWorker { syncer: Arc>, + layout_notify: Arc, - layout_version: u64, + layout_versions: (u64, u64, u64), + add_full_sync_rx: mpsc::UnboundedReceiver<()>, - todo: Vec, next_full_sync: Instant, + + todo: Option, } impl SyncWorker { fn add_full_sync(&mut self) { - let system = &self.syncer.system; - let data = &self.syncer.data; - - let my_id = system.id; - - self.todo.clear(); - - let partitions = data.replication.partitions(); - - for i in 0..partitions.len() { - let begin = partitions[i].1; - - let end = if i + 1 < partitions.len() { - partitions[i + 1].1 - } else { - [0xFFu8; 32].into() - }; - - let nodes = data.replication.write_nodes(&begin); - - let retain = nodes.contains(&my_id); - if !retain { - // Check if we have some data to send, otherwise skip - match data.store.range(begin..end) { - Ok(mut iter) => { - if iter.next().is_none() { - continue; - } - } - Err(e) => { - warn!("DB error in add_full_sync: {}", e); - continue; - } - } - } - - self.todo.push(TodoPartition { - partition: partitions[i].0, - begin, - end, - retain, - }); - } - + let mut partitions = self.syncer.data.replication.sync_partitions(); + partitions.partitions.shuffle(&mut thread_rng()); + self.todo = Some(partitions); self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; } - - fn pop_task(&mut self) -> Option { - if self.todo.is_empty() { - return None; - } - - let i = rand::thread_rng().gen_range(0..self.todo.len()); - if i == self.todo.len() - 1 { - self.todo.pop() - } else { - let replacement = self.todo.pop().unwrap(); - let ret = std::mem::replace(&mut self.todo[i], replacement); - Some(ret) - } - } } #[async_trait] @@ -572,18 +504,46 @@ impl Worker for SyncWorker { fn status(&self) -> WorkerStatus { WorkerStatus { - queue_length: Some(self.todo.len() as u64), + queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64), ..Default::default() } } async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result { - if let Some(partition) = self.pop_task() { - self.syncer.sync_partition(&partition, must_exit).await?; - Ok(WorkerState::Busy) - } else { - Ok(WorkerState::Idle) + if let Some(todo) = &mut self.todo { + let partition = todo.partitions.pop().unwrap(); + + // process partition + if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await { + error!( + "{}: Failed to sync partition {:?}: {}", + F::TABLE_NAME, + partition, + e + ); + // if error, put partition back at the other side of the queue, + // so that other partitions will be tried in the meantime + todo.partitions.insert(0, partition); + // TODO: returning an error here will cause the background job worker + // to delay this task for some time, but maybe we don't want to + // delay it if there are lots of failures from nodes that are gone + // (we also don't want zero delays as that will cause lots of useless retries) + return Err(e); + } + + // done + if !todo.partitions.is_empty() { + return Ok(WorkerState::Busy); + } + + self.syncer + .system + .layout_manager + .sync_table_until(F::TABLE_NAME, todo.layout_version); } + + self.todo = None; + Ok(WorkerState::Idle) } async fn wait_for_work(&mut self) -> WorkerState { @@ -594,10 +554,16 @@ impl Worker for SyncWorker { } }, _ = self.layout_notify.notified() => { - let new_version = self.syncer.system.cluster_layout().current().version; - if new_version > self.layout_version { - self.layout_version = new_version; - debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME); + let layout_versions = self.syncer.system.cluster_layout().sync_versions(); + if layout_versions != self.layout_versions { + self.layout_versions = layout_versions; + debug!( + "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", + F::TABLE_NAME, + layout_versions.0, + layout_versions.1, + layout_versions.2 + ); self.add_full_sync(); } }, @@ -605,9 +571,9 @@ impl Worker for SyncWorker { self.add_full_sync(); } } - match self.todo.is_empty() { - false => WorkerState::Busy, - true => WorkerState::Idle, + match self.todo.is_some() { + true => WorkerState::Busy, + false => WorkerState::Idle, } } } -- cgit v1.2.3 From df24bb806d64d5d5e748c35efe3f49ad3dda709e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Nov 2023 12:37:33 +0100 Subject: layout/sync: fix bugs and add tracing --- src/table/sync.rs | 60 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 22 deletions(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index 43636faa..8c21db8b 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -488,8 +488,29 @@ struct SyncWorker { } impl SyncWorker { + fn check_add_full_sync(&mut self) { + let layout_versions = self.syncer.system.cluster_layout().sync_versions(); + if layout_versions != self.layout_versions { + self.layout_versions = layout_versions; + info!( + "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", + F::TABLE_NAME, + layout_versions.0, + layout_versions.1, + layout_versions.2 + ); + self.add_full_sync(); + } + } + fn add_full_sync(&mut self) { let mut partitions = self.syncer.data.replication.sync_partitions(); + info!( + "{}: Adding full sync for ack layout version {}", + F::TABLE_NAME, + partitions.layout_version + ); + partitions.partitions.shuffle(&mut thread_rng()); self.todo = Some(partitions); self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; @@ -510,6 +531,8 @@ impl Worker for SyncWorker { } async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result { + self.check_add_full_sync(); + if let Some(todo) = &mut self.todo { let partition = todo.partitions.pop().unwrap(); @@ -531,19 +554,23 @@ impl Worker for SyncWorker { return Err(e); } - // done - if !todo.partitions.is_empty() { - return Ok(WorkerState::Busy); + if todo.partitions.is_empty() { + info!( + "{}: Completed full sync for ack layout version {}", + F::TABLE_NAME, + todo.layout_version + ); + self.syncer + .system + .layout_manager + .sync_table_until(F::TABLE_NAME, todo.layout_version); + self.todo = None; } - self.syncer - .system - .layout_manager - .sync_table_until(F::TABLE_NAME, todo.layout_version); + Ok(WorkerState::Busy) + } else { + Ok(WorkerState::Idle) } - - self.todo = None; - Ok(WorkerState::Idle) } async fn wait_for_work(&mut self) -> WorkerState { @@ -554,18 +581,7 @@ impl Worker for SyncWorker { } }, _ = self.layout_notify.notified() => { - let layout_versions = self.syncer.system.cluster_layout().sync_versions(); - if layout_versions != self.layout_versions { - self.layout_versions = layout_versions; - debug!( - "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", - F::TABLE_NAME, - layout_versions.0, - layout_versions.1, - layout_versions.2 - ); - self.add_full_sync(); - } + self.check_add_full_sync(); }, _ = tokio::time::sleep_until(self.next_full_sync.into()) => { self.add_full_sync(); -- cgit v1.2.3 From 1aab1f4e688ebc3f3adcb41c817c16c688a3291c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 13:06:16 +0100 Subject: layout: refactoring of all_nodes --- src/table/replication/fullcopy.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/table') diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 5653a229..beaacc2b 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -35,10 +35,10 @@ impl TableReplication for TableFullReplication { } fn write_nodes(&self, _hash: &Hash) -> Vec { - self.system.cluster_layout().current().node_ids().to_vec() + self.system.cluster_layout().current().all_nodes().to_vec() } fn write_quorum(&self) -> usize { - let nmembers = self.system.cluster_layout().current().node_ids().len(); + let nmembers = self.system.cluster_layout().current().all_nodes().len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { @@ -62,7 +62,7 @@ impl TableReplication for TableFullReplication { partition: 0u16, first_hash: [0u8; 32].into(), last_hash: [0xff; 32].into(), - storage_nodes: Vec::from_iter(layout.current().node_ids().to_vec()), + storage_nodes: Vec::from_iter(layout.current().all_nodes().to_vec()), }], } } -- cgit v1.2.3 From 3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 14:28:16 +0100 Subject: layout: prepare for write sets --- src/table/data.rs | 3 ++- src/table/gc.rs | 2 +- src/table/replication/fullcopy.rs | 9 +++++++-- src/table/replication/parameters.rs | 8 +++++--- src/table/replication/sharded.rs | 24 ++++++++---------------- src/table/sync.rs | 2 +- src/table/table.rs | 6 ++++-- 7 files changed, 28 insertions(+), 26 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index bbfdf58b..7f6b7847 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -254,7 +254,8 @@ impl TableData { // of the GC algorithm, as in all cases GC is suspended if // any node of the partition is unavailable. let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); - let nodes = self.replication.write_nodes(&pk_hash); + // TODO: this probably breaks when the layout changes + let nodes = self.replication.storage_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; } diff --git a/src/table/gc.rs b/src/table/gc.rs index 2135a358..002cfbf4 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -152,7 +152,7 @@ impl TableGc { let mut partitions = HashMap::new(); for entry in entries { let pkh = Hash::try_from(&entry.key[..32]).unwrap(); - let mut nodes = self.data.replication.write_nodes(&pkh); + let mut nodes = self.data.replication.storage_nodes(&pkh); nodes.retain(|x| *x != self.system.id); nodes.sort(); diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index beaacc2b..cb5471af 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -27,6 +27,11 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { + fn storage_nodes(&self, _hash: &Hash) -> Vec { + let layout = self.system.cluster_layout(); + layout.current().all_nodes().to_vec() + } + fn read_nodes(&self, _hash: &Hash) -> Vec { vec![self.system.id] } @@ -34,8 +39,8 @@ impl TableReplication for TableFullReplication { 1 } - fn write_nodes(&self, _hash: &Hash) -> Vec { - self.system.cluster_layout().current().all_nodes().to_vec() + fn write_sets(&self, hash: &Hash) -> Vec> { + vec![self.storage_nodes(hash)] } fn write_quorum(&self) -> usize { let nmembers = self.system.cluster_layout().current().all_nodes().len(); diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 2a7d3585..2f842409 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -6,21 +6,23 @@ pub trait TableReplication: Send + Sync + 'static { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods + /// The entire list of all nodes that store a partition + fn storage_nodes(&self, hash: &Hash) -> Vec; + /// Which nodes to send read requests to fn read_nodes(&self, hash: &Hash) -> Vec; /// Responses needed to consider a read succesfull fn read_quorum(&self) -> usize; /// Which nodes to send writes to - fn write_nodes(&self, hash: &Hash) -> Vec; - /// Responses needed to consider a write succesfull + fn write_sets(&self, hash: &Hash) -> Vec>; + /// Responses needed to consider a write succesfull in each set fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; // Accessing partitions, for Merkle tree & sync /// Get partition for data with given hash fn partition_of(&self, hash: &Hash) -> Partition; - /// List of partitions and nodes to sync with in current layout fn sync_partitions(&self) -> SyncPartitions; } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index f02b1d66..1320a189 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -25,21 +25,19 @@ pub struct TableShardedReplication { } impl TableReplication for TableShardedReplication { + fn storage_nodes(&self, hash: &Hash) -> Vec { + self.system.cluster_layout().storage_nodes_of(hash) + } + fn read_nodes(&self, hash: &Hash) -> Vec { - self.system - .cluster_layout() - .current() - .nodes_of(hash, self.replication_factor) + self.system.cluster_layout().read_nodes_of(hash) } fn read_quorum(&self) -> usize { self.read_quorum } - fn write_nodes(&self, hash: &Hash) -> Vec { - self.system - .cluster_layout() - .current() - .nodes_of(hash, self.replication_factor) + fn write_sets(&self, hash: &Hash) -> Vec> { + self.system.cluster_layout().write_sets_of(hash) } fn write_quorum(&self) -> usize { self.write_quorum @@ -60,13 +58,7 @@ impl TableReplication for TableShardedReplication { .current() .partitions() .map(|(partition, first_hash)| { - let mut storage_nodes = layout - .write_sets_of(&first_hash) - .map(|x| x.into_iter()) - .flatten() - .collect::>(); - storage_nodes.sort(); - storage_nodes.dedup(); + let storage_nodes = layout.storage_nodes_of(&first_hash); SyncPartition { partition, first_hash, diff --git a/src/table/sync.rs b/src/table/sync.rs index 8c21db8b..b67cdd79 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -176,7 +176,7 @@ impl TableSyncer { let nodes = self .data .replication - .write_nodes(begin) + .storage_nodes(begin) .into_iter() .collect::>(); if nodes.contains(&self.system.id) { diff --git a/src/table/table.rs b/src/table/table.rs index 997fd7dc..bf08d5a0 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -119,7 +119,8 @@ impl Table { async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.data.replication.write_nodes(&hash); + // TODO: use write sets + let who = self.data.replication.storage_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::::Update(vec![e_enc]); @@ -171,7 +172,8 @@ impl Table { for entry in entries.into_iter() { let entry = entry.borrow(); let hash = entry.partition_key().hash(); - let who = self.data.replication.write_nodes(&hash); + // TODO: use write sets + let who = self.data.replication.storage_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); for node in who { call_list.entry(node).or_default().push(e_enc.clone()); -- cgit v1.2.3 From 90e1619b1e9f5d81e59da371f04717f0c4fe5afc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 15:40:46 +0100 Subject: table: take into account multiple write sets in inserts --- src/table/gc.rs | 4 ++-- src/table/table.rs | 17 +++++++---------- 2 files changed, 9 insertions(+), 12 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index 002cfbf4..ef788749 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -230,7 +230,7 @@ impl TableGc { .rpc_helper() .try_call_many( &self.endpoint, - &nodes[..], + &nodes, GcRpc::Update(updates), RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) @@ -251,7 +251,7 @@ impl TableGc { .rpc_helper() .try_call_many( &self.endpoint, - &nodes[..], + &nodes, GcRpc::DeleteIfEqualHash(deletes), RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) diff --git a/src/table/table.rs b/src/table/table.rs index bf08d5a0..c2efaeaf 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -119,17 +119,16 @@ impl Table { async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - // TODO: use write sets - let who = self.data.replication.storage_nodes(&hash); + let who = self.data.replication.write_sets(&hash); let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::::Update(vec![e_enc]); self.system .rpc_helper() - .try_call_many( + .try_write_many_sets( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.write_quorum()), @@ -243,11 +242,10 @@ impl Table { .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()) - .interrupt_after_quorum(true), + .with_quorum(self.data.replication.read_quorum()), ) .await?; @@ -339,11 +337,10 @@ impl Table { .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()) - .interrupt_after_quorum(true), + .with_quorum(self.data.replication.read_quorum()), ) .await?; -- cgit v1.2.3 From 33c8a489b0a9c0e869282bfc19c548f5a3e02e8c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 15:40:44 +0100 Subject: layou: implement ack locking --- src/table/replication/fullcopy.rs | 4 +++- src/table/replication/parameters.rs | 4 +++- src/table/replication/sharded.rs | 6 ++++-- src/table/sync.rs | 9 ++------- src/table/table.rs | 2 +- 5 files changed, 13 insertions(+), 12 deletions(-) (limited to 'src/table') diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index cb5471af..df930224 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -27,6 +27,8 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { + type WriteSets = Vec>; + fn storage_nodes(&self, _hash: &Hash) -> Vec { let layout = self.system.cluster_layout(); layout.current().all_nodes().to_vec() @@ -39,7 +41,7 @@ impl TableReplication for TableFullReplication { 1 } - fn write_sets(&self, hash: &Hash) -> Vec> { + fn write_sets(&self, hash: &Hash) -> Self::WriteSets { vec![self.storage_nodes(hash)] } fn write_quorum(&self) -> usize { diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 2f842409..a4e701bb 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -3,6 +3,8 @@ use garage_util::data::*; /// Trait to describe how a table shall be replicated pub trait TableReplication: Send + Sync + 'static { + type WriteSets: AsRef>> + Send + Sync + 'static; + // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods @@ -15,7 +17,7 @@ pub trait TableReplication: Send + Sync + 'static { fn read_quorum(&self) -> usize; /// Which nodes to send writes to - fn write_sets(&self, hash: &Hash) -> Vec>; + fn write_sets(&self, hash: &Hash) -> Self::WriteSets; /// Responses needed to consider a write succesfull in each set fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 1320a189..2a16bc0c 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -25,6 +25,8 @@ pub struct TableShardedReplication { } impl TableReplication for TableShardedReplication { + type WriteSets = WriteLock>>; + fn storage_nodes(&self, hash: &Hash) -> Vec { self.system.cluster_layout().storage_nodes_of(hash) } @@ -36,8 +38,8 @@ impl TableReplication for TableShardedReplication { self.read_quorum } - fn write_sets(&self, hash: &Hash) -> Vec> { - self.system.cluster_layout().write_sets_of(hash) + fn write_sets(&self, hash: &Hash) -> Self::WriteSets { + self.system.layout_manager.write_sets_of(hash) } fn write_quorum(&self) -> usize { self.write_quorum diff --git a/src/table/sync.rs b/src/table/sync.rs index b67cdd79..efeac402 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -173,12 +173,7 @@ impl TableSyncer { } if !items.is_empty() { - let nodes = self - .data - .replication - .storage_nodes(begin) - .into_iter() - .collect::>(); + let nodes = self.data.replication.storage_nodes(begin); if nodes.contains(&self.system.id) { warn!( "({}) Interrupting offload as partitions seem to have changed", @@ -202,7 +197,7 @@ impl TableSyncer { end, counter ); - self.offload_items(&items, &nodes[..]).await?; + self.offload_items(&items, &nodes).await?; } else { break; } diff --git a/src/table/table.rs b/src/table/table.rs index c2efaeaf..5ec9eb0a 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -128,7 +128,7 @@ impl Table { .rpc_helper() .try_write_many_sets( &self.endpoint, - &who, + who.as_ref(), rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.write_quorum()), -- cgit v1.2.3 From 3ecd14b9f6202ad3c5513c6ad7422bd408134002 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 16 Nov 2023 16:41:45 +0100 Subject: table: implement write sets for insert_many --- src/table/table.rs | 157 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 127 insertions(+), 30 deletions(-) (limited to 'src/table') diff --git a/src/table/table.rs b/src/table/table.rs index 5ec9eb0a..7d1ff31c 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -143,7 +143,7 @@ impl Table { self.data.queue_insert(tx, e) } - pub async fn insert_many(&self, entries: I) -> Result<(), Error> + pub async fn insert_many(self: &Arc, entries: I) -> Result<(), Error> where I: IntoIterator + Send + Sync, IE: Borrow + Send + Sync, @@ -161,52 +161,149 @@ impl Table { Ok(()) } - async fn insert_many_internal(&self, entries: I) -> Result<(), Error> + async fn insert_many_internal(self: &Arc, entries: I) -> Result<(), Error> where I: IntoIterator + Send + Sync, IE: Borrow + Send + Sync, { - let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); - + // The different items will have to be stored on possibly different nodes. + // We will here batch all items into a single request for each concerned + // node, with all of the entries it must store within that request. + // Each entry has to be saved to a specific list of "write sets", i.e. a set + // of node within wich a quorum must be achieved. In normal operation, there + // is a single write set which corresponds to the quorum in the current + // cluster layout, but when the layout is updated, multiple write sets might + // have to be handled at once. Here, since we are sending many entries, we + // will have to handle many write sets in all cases. The algorihtm is thus + // to send one request to each node with all the items it must save, + // and keep track of the OK responses within each write set: if for all sets + // a quorum of nodes has answered OK, then the insert has succeeded and + // consistency properties (read-after-write) are preserved. + + // Some code here might feel redundant with RpcHelper::try_write_many_sets, + // but I think deduplicating could lead to more spaghetti instead of + // improving the readability, so I'm leaving as is. + + let quorum = self.data.replication.write_quorum(); + + // Serialize all entries and compute the write sets for each of them. + // In the case of sharded table replication, this also takes an "ack lock" + // to the layout manager to avoid ack'ing newer versions which are not + // taken into account by writes in progress (the ack can happen later, once + // all writes that didn't take the new layout into account are finished). + // These locks are released when entries_vec is dropped, i.e. when this + // function returns. + let mut entries_vec = Vec::new(); for entry in entries.into_iter() { let entry = entry.borrow(); let hash = entry.partition_key().hash(); - // TODO: use write sets - let who = self.data.replication.storage_nodes(&hash); + let write_sets = self.data.replication.write_sets(&hash); let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); - for node in who { - call_list.entry(node).or_default().push(e_enc.clone()); + entries_vec.push((write_sets, e_enc)); + } + + // Compute a deduplicated list of all of the write sets, + // and compute an index from each node to the position of the sets in which + // it takes part, to optimize the detection of a quorum. + let mut write_sets = entries_vec + .iter() + .map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice())) + .flatten() + .collect::>(); + write_sets.sort(); + write_sets.dedup(); + let mut write_set_index = HashMap::<&Uuid, Vec>::new(); + for (i, write_set) in write_sets.iter().enumerate() { + for node in write_set.iter() { + write_set_index.entry(node).or_default().push(i); } } - let call_futures = call_list.drain().map(|(node, entries)| async move { - let rpc = TableRpc::::Update(entries); - - let resp = self - .system - .rpc_helper() - .call( - &self.endpoint, - node, - rpc, - RequestStrategy::with_priority(PRIO_NORMAL), - ) - .await?; - Ok::<_, Error>((node, resp)) + // Build a map of all nodes to the entries that must be sent to that node. + let mut call_list: HashMap> = HashMap::new(); + for (write_sets, entry_enc) in entries_vec.iter() { + for write_set in write_sets.as_ref().iter() { + for node in write_set.iter() { + call_list.entry(*node).or_default().push(entry_enc.clone()) + } + } + } + + // Build futures to actually perform each of the corresponding RPC calls + let call_count = call_list.len(); + let call_futures = call_list.into_iter().map(|(node, entries)| { + let this = self.clone(); + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer.start(format!("RPC to {:?}", node)); + let fut = async move { + let rpc = TableRpc::::Update(entries); + let resp = this + .system + .rpc_helper() + .call( + &this.endpoint, + node, + rpc, + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(quorum), + ) + .await; + (node, resp) + }; + fut.with_context(Context::current_with_span(span)) }); + + // Run all requests in parallel thanks to FuturesUnordered, and collect results. let mut resps = call_futures.collect::>(); + let mut set_counters = vec![(0, 0); write_sets.len()]; + let mut successes = 0; let mut errors = vec![]; - while let Some(resp) = resps.next().await { - if let Err(e) = resp { - errors.push(e); + while let Some((node, resp)) = resps.next().await { + match resp { + Ok(_) => { + successes += 1; + for set in write_set_index.get(&node).unwrap().iter() { + set_counters[*set].0 += 1; + } + } + Err(e) => { + errors.push(e); + for set in write_set_index.get(&node).unwrap().iter() { + set_counters[*set].1 += 1; + } + } + } + + if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) { + // Success + + // Continue all other requests in background + tokio::spawn(async move { + resps.collect::)>>().await; + }); + + return Ok(()); + } + + if set_counters + .iter() + .enumerate() + .any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len()) + { + // Too many errors in this set, we know we won't get a quorum + break; } } - if errors.len() > self.data.replication.max_write_errors() { - Err(Error::Message("Too many errors".into())) - } else { - Ok(()) - } + + // Failure, could not get quorum within at least one set + let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); + Err(Error::Quorum( + quorum, + Some(write_sets.len()), + successes, + call_count, + errors, + )) } pub async fn get( -- cgit v1.2.3 From 95eb13eb08d517d328e3c8aeb222440a27211ee9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Dec 2023 10:55:15 +0100 Subject: rpc: refactor result tracking for quorum sets --- src/table/replication/parameters.rs | 2 +- src/table/table.rs | 54 ++++++++----------------------------- 2 files changed, 12 insertions(+), 44 deletions(-) (limited to 'src/table') diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index a4e701bb..db11ff5f 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -3,7 +3,7 @@ use garage_util::data::*; /// Trait to describe how a table shall be replicated pub trait TableReplication: Send + Sync + 'static { - type WriteSets: AsRef>> + Send + Sync + 'static; + type WriteSets: AsRef>> + AsMut>> + Send + Sync + 'static; // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods diff --git a/src/table/table.rs b/src/table/table.rs index 7d1ff31c..6508cf5d 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -20,6 +20,7 @@ use garage_util::error::Error; use garage_util::metrics::RecordDuration; use garage_util::migrate::Migrate; +use garage_rpc::rpc_helper::QuorumSetResultTracker; use garage_rpc::system::System; use garage_rpc::*; @@ -180,10 +181,6 @@ impl Table { // a quorum of nodes has answered OK, then the insert has succeeded and // consistency properties (read-after-write) are preserved. - // Some code here might feel redundant with RpcHelper::try_write_many_sets, - // but I think deduplicating could lead to more spaghetti instead of - // improving the readability, so I'm leaving as is. - let quorum = self.data.replication.write_quorum(); // Serialize all entries and compute the write sets for each of them. @@ -197,7 +194,10 @@ impl Table { for entry in entries.into_iter() { let entry = entry.borrow(); let hash = entry.partition_key().hash(); - let write_sets = self.data.replication.write_sets(&hash); + let mut write_sets = self.data.replication.write_sets(&hash); + for set in write_sets.as_mut().iter_mut() { + set.sort(); + } let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); entries_vec.push((write_sets, e_enc)); } @@ -212,12 +212,8 @@ impl Table { .collect::>(); write_sets.sort(); write_sets.dedup(); - let mut write_set_index = HashMap::<&Uuid, Vec>::new(); - for (i, write_set) in write_sets.iter().enumerate() { - for node in write_set.iter() { - write_set_index.entry(node).or_default().push(i); - } - } + + let mut result_tracker = QuorumSetResultTracker::new(&write_sets, quorum); // Build a map of all nodes to the entries that must be sent to that node. let mut call_list: HashMap> = HashMap::new(); @@ -230,7 +226,6 @@ impl Table { } // Build futures to actually perform each of the corresponding RPC calls - let call_count = call_list.len(); let call_futures = call_list.into_iter().map(|(node, entries)| { let this = self.clone(); let tracer = opentelemetry::global::tracer("garage"); @@ -254,27 +249,11 @@ impl Table { // Run all requests in parallel thanks to FuturesUnordered, and collect results. let mut resps = call_futures.collect::>(); - let mut set_counters = vec![(0, 0); write_sets.len()]; - let mut successes = 0; - let mut errors = vec![]; while let Some((node, resp)) = resps.next().await { - match resp { - Ok(_) => { - successes += 1; - for set in write_set_index.get(&node).unwrap().iter() { - set_counters[*set].0 += 1; - } - } - Err(e) => { - errors.push(e); - for set in write_set_index.get(&node).unwrap().iter() { - set_counters[*set].1 += 1; - } - } - } + result_tracker.register_result(node, resp.map(|_| ())); - if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) { + if result_tracker.all_quorums_ok() { // Success // Continue all other requests in background @@ -285,25 +264,14 @@ impl Table { return Ok(()); } - if set_counters - .iter() - .enumerate() - .any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len()) - { + if result_tracker.too_many_failures() { // Too many errors in this set, we know we won't get a quorum break; } } // Failure, could not get quorum within at least one set - let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); - Err(Error::Quorum( - quorum, - Some(write_sets.len()), - successes, - call_count, - errors, - )) + Err(result_tracker.quorum_error()) } pub async fn get( -- cgit v1.2.3 From d90de365b3b30cb631b22fcd62c98bddb5a91549 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Dec 2023 11:16:10 +0100 Subject: table sync: use write quorums to report global success or failure of sync --- src/table/replication/fullcopy.rs | 3 +-- src/table/replication/parameters.rs | 2 +- src/table/replication/sharded.rs | 4 +-- src/table/sync.rs | 51 ++++++++++++++++++++++--------------- 4 files changed, 34 insertions(+), 26 deletions(-) (limited to 'src/table') diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index df930224..30122f39 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -1,4 +1,3 @@ -use std::iter::FromIterator; use std::sync::Arc; use garage_rpc::layout::*; @@ -69,7 +68,7 @@ impl TableReplication for TableFullReplication { partition: 0u16, first_hash: [0u8; 32].into(), last_hash: [0xff; 32].into(), - storage_nodes: Vec::from_iter(layout.current().all_nodes().to_vec()), + storage_sets: vec![layout.current().all_nodes().to_vec()], }], } } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index db11ff5f..78470f35 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -40,5 +40,5 @@ pub struct SyncPartition { pub partition: Partition, pub first_hash: Hash, pub last_hash: Hash, - pub storage_nodes: Vec, + pub storage_sets: Vec>, } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 2a16bc0c..55d0029d 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -60,12 +60,12 @@ impl TableReplication for TableShardedReplication { .current() .partitions() .map(|(partition, first_hash)| { - let storage_nodes = layout.storage_nodes_of(&first_hash); + let storage_sets = layout.storage_sets_of(&first_hash); SyncPartition { partition, first_hash, last_hash: [0u8; 32].into(), // filled in just after - storage_nodes, + storage_sets, } }) .collect::>(); diff --git a/src/table/sync.rs b/src/table/sync.rs index efeac402..cfcbc4b5 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -18,6 +18,7 @@ use garage_util::encode::{debug_serialize, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::layout::*; +use garage_rpc::rpc_helper::QuorumSetResultTracker; use garage_rpc::system::System; use garage_rpc::*; @@ -106,44 +107,52 @@ impl TableSyncer { must_exit: &mut watch::Receiver, ) -> Result<(), Error> { let my_id = self.system.id; - let retain = partition.storage_nodes.contains(&my_id); + let retain = partition.storage_sets.iter().any(|x| x.contains(&my_id)); if retain { debug!( "({}) Syncing {:?} with {:?}...", F::TABLE_NAME, partition, - partition.storage_nodes + partition.storage_sets ); - let mut sync_futures = partition - .storage_nodes + let mut result_tracker = QuorumSetResultTracker::new( + &partition.storage_sets, + self.data.replication.write_quorum(), + ); + + let mut sync_futures = result_tracker + .nodes .iter() - .filter(|node| **node != my_id) + .map(|(node, _)| *node) .map(|node| { - self.clone() - .do_sync_with(&partition, *node, must_exit.clone()) + let must_exit = must_exit.clone(); + async move { + if node == my_id { + (node, Ok(())) + } else { + (node, self.do_sync_with(&partition, node, must_exit).await) + } + } }) .collect::>(); - let mut n_errors = 0; - while let Some(r) = sync_futures.next().await { - if let Err(e) = r { - n_errors += 1; - warn!("({}) Sync error: {}", F::TABLE_NAME, e); + while let Some((node, res)) = sync_futures.next().await { + if let Err(e) = &res { + warn!("({}) Sync error with {:?}: {}", F::TABLE_NAME, node, e); } + result_tracker.register_result(node, res); } - if n_errors > 0 { - return Err(Error::Message(format!( - "Sync failed with {} nodes.", - n_errors - ))); + + if result_tracker.too_many_failures() { + return Err(result_tracker.quorum_error()); + } else { + Ok(()) } } else { self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit) - .await?; + .await } - - Ok(()) } // Offload partition: this partition is not something we are storing, @@ -264,7 +273,7 @@ impl TableSyncer { } async fn do_sync_with( - self: Arc, + self: &Arc, partition: &SyncPartition, who: Uuid, must_exit: watch::Receiver, -- cgit v1.2.3 From f8df90b79b93e4a1391839435718bad8c697246d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 8 Dec 2023 14:54:11 +0100 Subject: table: fix insert_many to not send duplicates --- src/table/table.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'src/table') diff --git a/src/table/table.rs b/src/table/table.rs index 6508cf5d..59cfdd07 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -196,6 +196,8 @@ impl Table { let hash = entry.partition_key().hash(); let mut write_sets = self.data.replication.write_sets(&hash); for set in write_sets.as_mut().iter_mut() { + // Sort nodes in each write sets to merge write sets with same + // nodes but in possibly different orders set.sort(); } let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); @@ -220,7 +222,16 @@ impl Table { for (write_sets, entry_enc) in entries_vec.iter() { for write_set in write_sets.as_ref().iter() { for node in write_set.iter() { - call_list.entry(*node).or_default().push(entry_enc.clone()) + let node_entries = call_list.entry(*node).or_default(); + match node_entries.last() { + Some(x) if Arc::ptr_eq(x, entry_enc) => { + // skip if entry already in list to send to this node + // (could happen if node is in several write sets for this entry) + } + _ => { + node_entries.push(entry_enc.clone()); + } + } } } } -- cgit v1.2.3 From e4f493b48156e6e30f16fba10f300f6cb5fe0b0d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Dec 2023 14:57:42 +0100 Subject: table: remove redundant tracing in insert_many --- src/table/table.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'src/table') diff --git a/src/table/table.rs b/src/table/table.rs index 59cfdd07..05a0dab1 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -239,9 +239,7 @@ impl Table { // Build futures to actually perform each of the corresponding RPC calls let call_futures = call_list.into_iter().map(|(node, entries)| { let this = self.clone(); - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer.start(format!("RPC to {:?}", node)); - let fut = async move { + async move { let rpc = TableRpc::::Update(entries); let resp = this .system @@ -254,8 +252,7 @@ impl Table { ) .await; (node, resp) - }; - fut.with_context(Context::current_with_span(span)) + } }); // Run all requests in parallel thanks to FuturesUnordered, and collect results. -- cgit v1.2.3 From 85b5a6bcd11c0a7651e4c589569e1935a3d18e46 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Dec 2023 15:31:47 +0100 Subject: fix some clippy lints --- src/table/sync.rs | 8 ++++---- src/table/table.rs | 3 +-- 2 files changed, 5 insertions(+), 6 deletions(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index cfcbc4b5..1561a2e5 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -123,15 +123,15 @@ impl TableSyncer { let mut sync_futures = result_tracker .nodes - .iter() - .map(|(node, _)| *node) + .keys() + .copied() .map(|node| { let must_exit = must_exit.clone(); async move { if node == my_id { (node, Ok(())) } else { - (node, self.do_sync_with(&partition, node, must_exit).await) + (node, self.do_sync_with(partition, node, must_exit).await) } } }) @@ -145,7 +145,7 @@ impl TableSyncer { } if result_tracker.too_many_failures() { - return Err(result_tracker.quorum_error()); + Err(result_tracker.quorum_error()) } else { Ok(()) } diff --git a/src/table/table.rs b/src/table/table.rs index 05a0dab1..a5be2910 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -209,8 +209,7 @@ impl Table { // it takes part, to optimize the detection of a quorum. let mut write_sets = entries_vec .iter() - .map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice())) - .flatten() + .flat_map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice())) .collect::>(); write_sets.sort(); write_sets.dedup(); -- cgit v1.2.3 From 0041b013a473e3ae72f50209d8f79db75a72848b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Dec 2023 16:09:22 +0100 Subject: layout: refactoring and fix in layout helper --- src/table/replication/sharded.rs | 2 +- src/table/sync.rs | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) (limited to 'src/table') diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 55d0029d..8ba3700f 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -54,7 +54,7 @@ impl TableReplication for TableShardedReplication { fn sync_partitions(&self) -> SyncPartitions { let layout = self.system.cluster_layout(); - let layout_version = layout.all_ack(); + let layout_version = layout.ack_map_min(); let mut partitions = layout .current() diff --git a/src/table/sync.rs b/src/table/sync.rs index 1561a2e5..cd080df0 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -83,7 +83,7 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), layout_notify: self.system.layout_notify(), - layout_versions: self.system.cluster_layout().sync_versions(), + layout_digest: self.system.cluster_layout().sync_digest(), add_full_sync_rx, todo: None, next_full_sync: Instant::now() + Duration::from_secs(20), @@ -483,7 +483,7 @@ struct SyncWorker { syncer: Arc>, layout_notify: Arc, - layout_versions: (u64, u64, u64), + layout_digest: SyncLayoutDigest, add_full_sync_rx: mpsc::UnboundedReceiver<()>, next_full_sync: Instant, @@ -493,15 +493,13 @@ struct SyncWorker { impl SyncWorker { fn check_add_full_sync(&mut self) { - let layout_versions = self.syncer.system.cluster_layout().sync_versions(); - if layout_versions != self.layout_versions { - self.layout_versions = layout_versions; + let layout_digest = self.syncer.system.cluster_layout().sync_digest(); + if layout_digest != self.layout_digest { + self.layout_digest = layout_digest; info!( - "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", + "({}) Layout versions changed ({:?}), adding full sync to syncer todo list", F::TABLE_NAME, - layout_versions.0, - layout_versions.1, - layout_versions.2 + layout_digest, ); self.add_full_sync(); } -- cgit v1.2.3