aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs178
1 files changed, 72 insertions, 106 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 4355bd9e..43636faa 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -6,7 +6,7 @@ use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
-use rand::Rng;
+use rand::prelude::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::select;
@@ -52,16 +52,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
-#[derive(Debug, Clone)]
-struct TodoPartition {
- partition: Partition,
- begin: Hash,
- end: Hash,
-
- // Are we a node that stores this partition or not?
- retain: bool,
-}
-
impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
pub(crate) fn new(
system: Arc<System>,
@@ -92,9 +82,9 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
bg.spawn_worker(SyncWorker {
syncer: self.clone(),
layout_notify: self.system.layout_notify(),
- layout_version: self.system.cluster_layout().current().version,
+ layout_versions: self.system.cluster_layout().sync_versions(),
add_full_sync_rx,
- todo: vec![],
+ todo: None,
next_full_sync: Instant::now() + Duration::from_secs(20),
});
}
@@ -112,31 +102,26 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
async fn sync_partition(
self: &Arc<Self>,
- partition: &TodoPartition,
+ partition: &SyncPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
- if partition.retain {
- let my_id = self.system.id;
-
- let nodes = self
- .data
- .replication
- .write_nodes(&partition.begin)
- .into_iter()
- .filter(|node| *node != my_id)
- .collect::<Vec<_>>();
+ let my_id = self.system.id;
+ let retain = partition.storage_nodes.contains(&my_id);
+ if retain {
debug!(
"({}) Syncing {:?} with {:?}...",
F::TABLE_NAME,
partition,
- nodes
+ partition.storage_nodes
);
- let mut sync_futures = nodes
+ let mut sync_futures = partition
+ .storage_nodes
.iter()
+ .filter(|node| **node != my_id)
.map(|node| {
self.clone()
- .do_sync_with(partition.clone(), *node, must_exit.clone())
+ .do_sync_with(&partition, *node, must_exit.clone())
})
.collect::<FuturesUnordered<_>>();
@@ -147,14 +132,14 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
warn!("({}) Sync error: {}", F::TABLE_NAME, e);
}
}
- if n_errors > self.data.replication.max_write_errors() {
+ if n_errors > 0 {
return Err(Error::Message(format!(
- "Sync failed with too many nodes (should have been: {:?}).",
- nodes
+ "Sync failed with {} nodes.",
+ n_errors
)));
}
} else {
- self.offload_partition(&partition.begin, &partition.end, must_exit)
+ self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit)
.await?;
}
@@ -285,7 +270,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
async fn do_sync_with(
self: Arc<Self>,
- partition: TodoPartition,
+ partition: &SyncPartition,
who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
@@ -492,76 +477,23 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
+
layout_notify: Arc<Notify>,
- layout_version: u64,
+ layout_versions: (u64, u64, u64),
+
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
- todo: Vec<TodoPartition>,
next_full_sync: Instant,
+
+ todo: Option<SyncPartitions>,
}
impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
- let system = &self.syncer.system;
- let data = &self.syncer.data;
-
- let my_id = system.id;
-
- self.todo.clear();
-
- let partitions = data.replication.partitions();
-
- for i in 0..partitions.len() {
- let begin = partitions[i].1;
-
- let end = if i + 1 < partitions.len() {
- partitions[i + 1].1
- } else {
- [0xFFu8; 32].into()
- };
-
- let nodes = data.replication.write_nodes(&begin);
-
- let retain = nodes.contains(&my_id);
- if !retain {
- // Check if we have some data to send, otherwise skip
- match data.store.range(begin..end) {
- Ok(mut iter) => {
- if iter.next().is_none() {
- continue;
- }
- }
- Err(e) => {
- warn!("DB error in add_full_sync: {}", e);
- continue;
- }
- }
- }
-
- self.todo.push(TodoPartition {
- partition: partitions[i].0,
- begin,
- end,
- retain,
- });
- }
-
+ let mut partitions = self.syncer.data.replication.sync_partitions();
+ partitions.partitions.shuffle(&mut thread_rng());
+ self.todo = Some(partitions);
self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
-
- fn pop_task(&mut self) -> Option<TodoPartition> {
- if self.todo.is_empty() {
- return None;
- }
-
- let i = rand::thread_rng().gen_range(0..self.todo.len());
- if i == self.todo.len() - 1 {
- self.todo.pop()
- } else {
- let replacement = self.todo.pop().unwrap();
- let ret = std::mem::replace(&mut self.todo[i], replacement);
- Some(ret)
- }
- }
}
#[async_trait]
@@ -572,18 +504,46 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
- queue_length: Some(self.todo.len() as u64),
+ queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- if let Some(partition) = self.pop_task() {
- self.syncer.sync_partition(&partition, must_exit).await?;
- Ok(WorkerState::Busy)
- } else {
- Ok(WorkerState::Idle)
+ if let Some(todo) = &mut self.todo {
+ let partition = todo.partitions.pop().unwrap();
+
+ // process partition
+ if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await {
+ error!(
+ "{}: Failed to sync partition {:?}: {}",
+ F::TABLE_NAME,
+ partition,
+ e
+ );
+ // if error, put partition back at the other side of the queue,
+ // so that other partitions will be tried in the meantime
+ todo.partitions.insert(0, partition);
+ // TODO: returning an error here will cause the background job worker
+ // to delay this task for some time, but maybe we don't want to
+ // delay it if there are lots of failures from nodes that are gone
+ // (we also don't want zero delays as that will cause lots of useless retries)
+ return Err(e);
+ }
+
+ // done
+ if !todo.partitions.is_empty() {
+ return Ok(WorkerState::Busy);
+ }
+
+ self.syncer
+ .system
+ .layout_manager
+ .sync_table_until(F::TABLE_NAME, todo.layout_version);
}
+
+ self.todo = None;
+ Ok(WorkerState::Idle)
}
async fn wait_for_work(&mut self) -> WorkerState {
@@ -594,10 +554,16 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
}
},
_ = self.layout_notify.notified() => {
- let new_version = self.syncer.system.cluster_layout().current().version;
- if new_version > self.layout_version {
- self.layout_version = new_version;
- debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME);
+ let layout_versions = self.syncer.system.cluster_layout().sync_versions();
+ if layout_versions != self.layout_versions {
+ self.layout_versions = layout_versions;
+ debug!(
+ "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list",
+ F::TABLE_NAME,
+ layout_versions.0,
+ layout_versions.1,
+ layout_versions.2
+ );
self.add_full_sync();
}
},
@@ -605,9 +571,9 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
self.add_full_sync();
}
}
- match self.todo.is_empty() {
- false => WorkerState::Busy,
- true => WorkerState::Idle,
+ match self.todo.is_some() {
+ true => WorkerState::Busy,
+ false => WorkerState::Idle,
}
}
}