aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs249
1 files changed, 116 insertions, 133 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 92a353c6..cd080df0 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -6,18 +6,19 @@ use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
-use rand::Rng;
+use rand::prelude::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::select;
-use tokio::sync::{mpsc, watch};
+use tokio::sync::{mpsc, watch, Notify};
use garage_util::background::*;
use garage_util::data::*;
use garage_util::encode::{debug_serialize, nonversioned_encode};
use garage_util::error::{Error, OkOrMessage};
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
+use garage_rpc::rpc_helper::QuorumSetResultTracker;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -52,16 +53,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
-#[derive(Debug, Clone)]
-struct TodoPartition {
- partition: Partition,
- begin: Hash,
- end: Hash,
-
- // Are we a node that stores this partition or not?
- retain: bool,
-}
-
impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
pub(crate) fn new(
system: Arc<System>,
@@ -91,10 +82,10 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
bg.spawn_worker(SyncWorker {
syncer: self.clone(),
- ring_recv: self.system.ring.clone(),
- ring: self.system.ring.borrow().clone(),
+ layout_notify: self.system.layout_notify(),
+ layout_digest: self.system.cluster_layout().sync_digest(),
add_full_sync_rx,
- todo: vec![],
+ todo: None,
next_full_sync: Instant::now() + Duration::from_secs(20),
});
}
@@ -112,53 +103,56 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
async fn sync_partition(
self: &Arc<Self>,
- partition: &TodoPartition,
+ partition: &SyncPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
- if partition.retain {
- let my_id = self.system.id;
-
- let nodes = self
- .data
- .replication
- .write_nodes(&partition.begin)
- .into_iter()
- .filter(|node| *node != my_id)
- .collect::<Vec<_>>();
+ let my_id = self.system.id;
+ let retain = partition.storage_sets.iter().any(|x| x.contains(&my_id));
+ if retain {
debug!(
"({}) Syncing {:?} with {:?}...",
F::TABLE_NAME,
partition,
- nodes
+ partition.storage_sets
);
- let mut sync_futures = nodes
- .iter()
+ let mut result_tracker = QuorumSetResultTracker::new(
+ &partition.storage_sets,
+ self.data.replication.write_quorum(),
+ );
+
+ let mut sync_futures = result_tracker
+ .nodes
+ .keys()
+ .copied()
.map(|node| {
- self.clone()
- .do_sync_with(partition.clone(), *node, must_exit.clone())
+ let must_exit = must_exit.clone();
+ async move {
+ if node == my_id {
+ (node, Ok(()))
+ } else {
+ (node, self.do_sync_with(partition, node, must_exit).await)
+ }
+ }
})
.collect::<FuturesUnordered<_>>();
- let mut n_errors = 0;
- while let Some(r) = sync_futures.next().await {
- if let Err(e) = r {
- n_errors += 1;
- warn!("({}) Sync error: {}", F::TABLE_NAME, e);
+ while let Some((node, res)) = sync_futures.next().await {
+ if let Err(e) = &res {
+ warn!("({}) Sync error with {:?}: {}", F::TABLE_NAME, node, e);
}
+ result_tracker.register_result(node, res);
}
- if n_errors > self.data.replication.max_write_errors() {
- return Err(Error::Message(format!(
- "Sync failed with too many nodes (should have been: {:?}).",
- nodes
- )));
+
+ if result_tracker.too_many_failures() {
+ Err(result_tracker.quorum_error())
+ } else {
+ Ok(())
}
} else {
- self.offload_partition(&partition.begin, &partition.end, must_exit)
- .await?;
+ self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit)
+ .await
}
-
- Ok(())
}
// Offload partition: this partition is not something we are storing,
@@ -188,12 +182,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
if !items.is_empty() {
- let nodes = self
- .data
- .replication
- .write_nodes(begin)
- .into_iter()
- .collect::<Vec<_>>();
+ let nodes = self.data.replication.storage_nodes(begin);
if nodes.contains(&self.system.id) {
warn!(
"({}) Interrupting offload as partitions seem to have changed",
@@ -217,7 +206,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
end,
counter
);
- self.offload_items(&items, &nodes[..]).await?;
+ self.offload_items(&items, &nodes).await?;
} else {
break;
}
@@ -244,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
nodes,
@@ -284,8 +273,8 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
async fn do_sync_with(
- self: Arc<Self>,
- partition: TodoPartition,
+ self: &Arc<Self>,
+ partition: &SyncPartition,
who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
@@ -305,7 +294,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// If so, do nothing.
let root_resp = self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -361,7 +350,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// and compare it with local node
let remote_node = match self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -437,7 +426,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
let rpc_resp = self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -492,75 +481,41 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
- ring_recv: watch::Receiver<Arc<Ring>>,
- ring: Arc<Ring>,
+
+ layout_notify: Arc<Notify>,
+ layout_digest: SyncLayoutDigest,
+
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
- todo: Vec<TodoPartition>,
next_full_sync: Instant,
+
+ todo: Option<SyncPartitions>,
}
impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
- fn add_full_sync(&mut self) {
- let system = &self.syncer.system;
- let data = &self.syncer.data;
-
- let my_id = system.id;
-
- self.todo.clear();
-
- let partitions = data.replication.partitions();
-
- for i in 0..partitions.len() {
- let begin = partitions[i].1;
-
- let end = if i + 1 < partitions.len() {
- partitions[i + 1].1
- } else {
- [0xFFu8; 32].into()
- };
-
- let nodes = data.replication.write_nodes(&begin);
-
- let retain = nodes.contains(&my_id);
- if !retain {
- // Check if we have some data to send, otherwise skip
- match data.store.range(begin..end) {
- Ok(mut iter) => {
- if iter.next().is_none() {
- continue;
- }
- }
- Err(e) => {
- warn!("DB error in add_full_sync: {}", e);
- continue;
- }
- }
- }
-
- self.todo.push(TodoPartition {
- partition: partitions[i].0,
- begin,
- end,
- retain,
- });
+ fn check_add_full_sync(&mut self) {
+ let layout_digest = self.syncer.system.cluster_layout().sync_digest();
+ if layout_digest != self.layout_digest {
+ self.layout_digest = layout_digest;
+ info!(
+ "({}) Layout versions changed ({:?}), adding full sync to syncer todo list",
+ F::TABLE_NAME,
+ layout_digest,
+ );
+ self.add_full_sync();
}
-
- self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
- fn pop_task(&mut self) -> Option<TodoPartition> {
- if self.todo.is_empty() {
- return None;
- }
+ fn add_full_sync(&mut self) {
+ let mut partitions = self.syncer.data.replication.sync_partitions();
+ info!(
+ "{}: Adding full sync for ack layout version {}",
+ F::TABLE_NAME,
+ partitions.layout_version
+ );
- let i = rand::thread_rng().gen_range(0..self.todo.len());
- if i == self.todo.len() - 1 {
- self.todo.pop()
- } else {
- let replacement = self.todo.pop().unwrap();
- let ret = std::mem::replace(&mut self.todo[i], replacement);
- Some(ret)
- }
+ partitions.partitions.shuffle(&mut thread_rng());
+ self.todo = Some(partitions);
+ self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
}
@@ -572,14 +527,48 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
- queue_length: Some(self.todo.len() as u64),
+ queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- if let Some(partition) = self.pop_task() {
- self.syncer.sync_partition(&partition, must_exit).await?;
+ self.check_add_full_sync();
+
+ if let Some(todo) = &mut self.todo {
+ let partition = todo.partitions.pop().unwrap();
+
+ // process partition
+ if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await {
+ error!(
+ "{}: Failed to sync partition {:?}: {}",
+ F::TABLE_NAME,
+ partition,
+ e
+ );
+ // if error, put partition back at the other side of the queue,
+ // so that other partitions will be tried in the meantime
+ todo.partitions.insert(0, partition);
+ // TODO: returning an error here will cause the background job worker
+ // to delay this task for some time, but maybe we don't want to
+ // delay it if there are lots of failures from nodes that are gone
+ // (we also don't want zero delays as that will cause lots of useless retries)
+ return Err(e);
+ }
+
+ if todo.partitions.is_empty() {
+ info!(
+ "{}: Completed full sync for ack layout version {}",
+ F::TABLE_NAME,
+ todo.layout_version
+ );
+ self.syncer
+ .system
+ .layout_manager
+ .sync_table_until(F::TABLE_NAME, todo.layout_version);
+ self.todo = None;
+ }
+
Ok(WorkerState::Busy)
} else {
Ok(WorkerState::Idle)
@@ -593,22 +582,16 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
self.add_full_sync();
}
},
- _ = self.ring_recv.changed() => {
- let new_ring = self.ring_recv.borrow();
- if !Arc::ptr_eq(&new_ring, &self.ring) {
- self.ring = new_ring.clone();
- drop(new_ring);
- debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- }
+ _ = self.layout_notify.notified() => {
+ self.check_add_full_sync();
},
_ = tokio::time::sleep_until(self.next_full_sync.into()) => {
self.add_full_sync();
}
}
- match self.todo.is_empty() {
- false => WorkerState::Busy,
- true => WorkerState::Idle,
+ match self.todo.is_some() {
+ true => WorkerState::Busy,
+ false => WorkerState::Idle,
}
}
}