diff options
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 249 |
1 files changed, 116 insertions, 133 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index 92a353c6..cd080df0 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -6,18 +6,19 @@ use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; use opentelemetry::KeyValue; -use rand::Rng; +use rand::prelude::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use tokio::select; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, Notify}; use garage_util::background::*; use garage_util::data::*; use garage_util::encode::{debug_serialize, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; -use garage_rpc::ring::*; +use garage_rpc::layout::*; +use garage_rpc::rpc_helper::QuorumSetResultTracker; use garage_rpc::system::System; use garage_rpc::*; @@ -52,16 +53,6 @@ impl Rpc for SyncRpc { type Response = Result<SyncRpc, Error>; } -#[derive(Debug, Clone)] -struct TodoPartition { - partition: Partition, - begin: Hash, - end: Hash, - - // Are we a node that stores this partition or not? - retain: bool, -} - impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { pub(crate) fn new( system: Arc<System>, @@ -91,10 +82,10 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { bg.spawn_worker(SyncWorker { syncer: self.clone(), - ring_recv: self.system.ring.clone(), - ring: self.system.ring.borrow().clone(), + layout_notify: self.system.layout_notify(), + layout_digest: self.system.cluster_layout().sync_digest(), add_full_sync_rx, - todo: vec![], + todo: None, next_full_sync: Instant::now() + Duration::from_secs(20), }); } @@ -112,53 +103,56 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { async fn sync_partition( self: &Arc<Self>, - partition: &TodoPartition, + partition: &SyncPartition, must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { - if partition.retain { - let my_id = self.system.id; - - let nodes = self - .data - .replication - .write_nodes(&partition.begin) - .into_iter() - .filter(|node| *node != my_id) - .collect::<Vec<_>>(); + let my_id = self.system.id; + let retain = partition.storage_sets.iter().any(|x| x.contains(&my_id)); + if retain { debug!( "({}) Syncing {:?} with {:?}...", F::TABLE_NAME, partition, - nodes + partition.storage_sets ); - let mut sync_futures = nodes - .iter() + let mut result_tracker = QuorumSetResultTracker::new( + &partition.storage_sets, + self.data.replication.write_quorum(), + ); + + let mut sync_futures = result_tracker + .nodes + .keys() + .copied() .map(|node| { - self.clone() - .do_sync_with(partition.clone(), *node, must_exit.clone()) + let must_exit = must_exit.clone(); + async move { + if node == my_id { + (node, Ok(())) + } else { + (node, self.do_sync_with(partition, node, must_exit).await) + } + } }) .collect::<FuturesUnordered<_>>(); - let mut n_errors = 0; - while let Some(r) = sync_futures.next().await { - if let Err(e) = r { - n_errors += 1; - warn!("({}) Sync error: {}", F::TABLE_NAME, e); + while let Some((node, res)) = sync_futures.next().await { + if let Err(e) = &res { + warn!("({}) Sync error with {:?}: {}", F::TABLE_NAME, node, e); } + result_tracker.register_result(node, res); } - if n_errors > self.data.replication.max_write_errors() { - return Err(Error::Message(format!( - "Sync failed with too many nodes (should have been: {:?}).", - nodes - ))); + + if result_tracker.too_many_failures() { + Err(result_tracker.quorum_error()) + } else { + Ok(()) } } else { - self.offload_partition(&partition.begin, &partition.end, must_exit) - .await?; + self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit) + .await } - - Ok(()) } // Offload partition: this partition is not something we are storing, @@ -188,12 +182,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { } if !items.is_empty() { - let nodes = self - .data - .replication - .write_nodes(begin) - .into_iter() - .collect::<Vec<_>>(); + let nodes = self.data.replication.storage_nodes(begin); if nodes.contains(&self.system.id) { warn!( "({}) Interrupting offload as partitions seem to have changed", @@ -217,7 +206,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { end, counter ); - self.offload_items(&items, &nodes[..]).await?; + self.offload_items(&items, &nodes).await?; } else { break; } @@ -244,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { } self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, nodes, @@ -284,8 +273,8 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { } async fn do_sync_with( - self: Arc<Self>, - partition: TodoPartition, + self: &Arc<Self>, + partition: &SyncPartition, who: Uuid, must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { @@ -305,7 +294,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { // If so, do nothing. let root_resp = self .system - .rpc + .rpc_helper() .call( &self.endpoint, who, @@ -361,7 +350,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { // and compare it with local node let remote_node = match self .system - .rpc + .rpc_helper() .call( &self.endpoint, who, @@ -437,7 +426,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { let rpc_resp = self .system - .rpc + .rpc_helper() .call( &self.endpoint, who, @@ -492,75 +481,41 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync struct SyncWorker<F: TableSchema, R: TableReplication> { syncer: Arc<TableSyncer<F, R>>, - ring_recv: watch::Receiver<Arc<Ring>>, - ring: Arc<Ring>, + + layout_notify: Arc<Notify>, + layout_digest: SyncLayoutDigest, + add_full_sync_rx: mpsc::UnboundedReceiver<()>, - todo: Vec<TodoPartition>, next_full_sync: Instant, + + todo: Option<SyncPartitions>, } impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> { - fn add_full_sync(&mut self) { - let system = &self.syncer.system; - let data = &self.syncer.data; - - let my_id = system.id; - - self.todo.clear(); - - let partitions = data.replication.partitions(); - - for i in 0..partitions.len() { - let begin = partitions[i].1; - - let end = if i + 1 < partitions.len() { - partitions[i + 1].1 - } else { - [0xFFu8; 32].into() - }; - - let nodes = data.replication.write_nodes(&begin); - - let retain = nodes.contains(&my_id); - if !retain { - // Check if we have some data to send, otherwise skip - match data.store.range(begin..end) { - Ok(mut iter) => { - if iter.next().is_none() { - continue; - } - } - Err(e) => { - warn!("DB error in add_full_sync: {}", e); - continue; - } - } - } - - self.todo.push(TodoPartition { - partition: partitions[i].0, - begin, - end, - retain, - }); + fn check_add_full_sync(&mut self) { + let layout_digest = self.syncer.system.cluster_layout().sync_digest(); + if layout_digest != self.layout_digest { + self.layout_digest = layout_digest; + info!( + "({}) Layout versions changed ({:?}), adding full sync to syncer todo list", + F::TABLE_NAME, + layout_digest, + ); + self.add_full_sync(); } - - self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; } - fn pop_task(&mut self) -> Option<TodoPartition> { - if self.todo.is_empty() { - return None; - } + fn add_full_sync(&mut self) { + let mut partitions = self.syncer.data.replication.sync_partitions(); + info!( + "{}: Adding full sync for ack layout version {}", + F::TABLE_NAME, + partitions.layout_version + ); - let i = rand::thread_rng().gen_range(0..self.todo.len()); - if i == self.todo.len() - 1 { - self.todo.pop() - } else { - let replacement = self.todo.pop().unwrap(); - let ret = std::mem::replace(&mut self.todo[i], replacement); - Some(ret) - } + partitions.partitions.shuffle(&mut thread_rng()); + self.todo = Some(partitions); + self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; } } @@ -572,14 +527,48 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { fn status(&self) -> WorkerStatus { WorkerStatus { - queue_length: Some(self.todo.len() as u64), + queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64), ..Default::default() } } async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - if let Some(partition) = self.pop_task() { - self.syncer.sync_partition(&partition, must_exit).await?; + self.check_add_full_sync(); + + if let Some(todo) = &mut self.todo { + let partition = todo.partitions.pop().unwrap(); + + // process partition + if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await { + error!( + "{}: Failed to sync partition {:?}: {}", + F::TABLE_NAME, + partition, + e + ); + // if error, put partition back at the other side of the queue, + // so that other partitions will be tried in the meantime + todo.partitions.insert(0, partition); + // TODO: returning an error here will cause the background job worker + // to delay this task for some time, but maybe we don't want to + // delay it if there are lots of failures from nodes that are gone + // (we also don't want zero delays as that will cause lots of useless retries) + return Err(e); + } + + if todo.partitions.is_empty() { + info!( + "{}: Completed full sync for ack layout version {}", + F::TABLE_NAME, + todo.layout_version + ); + self.syncer + .system + .layout_manager + .sync_table_until(F::TABLE_NAME, todo.layout_version); + self.todo = None; + } + Ok(WorkerState::Busy) } else { Ok(WorkerState::Idle) @@ -593,22 +582,16 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { self.add_full_sync(); } }, - _ = self.ring_recv.changed() => { - let new_ring = self.ring_recv.borrow(); - if !Arc::ptr_eq(&new_ring, &self.ring) { - self.ring = new_ring.clone(); - drop(new_ring); - debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - } + _ = self.layout_notify.notified() => { + self.check_add_full_sync(); }, _ = tokio::time::sleep_until(self.next_full_sync.into()) => { self.add_full_sync(); } } - match self.todo.is_empty() { - false => WorkerState::Busy, - true => WorkerState::Idle, + match self.todo.is_some() { + true => WorkerState::Busy, + false => WorkerState::Idle, } } } |