aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/replication/fullcopy.rs25
-rw-r--r--src/table/replication/parameters.rs19
-rw-r--r--src/table/replication/sharded.rs39
-rw-r--r--src/table/sync.rs178
4 files changed, 148 insertions, 113 deletions
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index a5c83d0f..5653a229 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,3 +1,4 @@
+use std::iter::FromIterator;
use std::sync::Arc;
use garage_rpc::layout::*;
@@ -6,10 +7,17 @@ use garage_util::data::*;
use crate::replication::*;
+// TODO: find a way to track layout changes for this as well
+// The hard thing is that this data is stored also on gateway nodes,
+// whereas sharded data is stored only on non-Gateway nodes (storage nodes)
+// Also we want to be more tolerant to failures of gateways so we don't
+// want to do too much holding back of data when progress of gateway
+// nodes is not reported in the layout history's ack/sync/sync_ack maps.
+
/// Full replication schema: all nodes store everything
-/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
+/// Inconvenient: if some writes fail, nodes will read outdated data
#[derive(Clone)]
pub struct TableFullReplication {
/// The membership manager of this node
@@ -44,7 +52,18 @@ impl TableReplication for TableFullReplication {
fn partition_of(&self, _hash: &Hash) -> Partition {
0u16
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- vec![(0u16, [0u8; 32].into())]
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.current().version;
+ SyncPartitions {
+ layout_version,
+ partitions: vec![SyncPartition {
+ partition: 0u16,
+ first_hash: [0u8; 32].into(),
+ last_hash: [0xff; 32].into(),
+ storage_nodes: Vec::from_iter(layout.current().node_ids().to_vec()),
+ }],
+ }
}
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 19b306f2..2a7d3585 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -20,6 +20,21 @@ pub trait TableReplication: Send + Sync + 'static {
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
- /// List of existing partitions
- fn partitions(&self) -> Vec<(Partition, Hash)>;
+
+ /// List of partitions and nodes to sync with in current layout
+ fn sync_partitions(&self) -> SyncPartitions;
+}
+
+#[derive(Debug)]
+pub struct SyncPartitions {
+ pub layout_version: u64,
+ pub partitions: Vec<SyncPartition>,
+}
+
+#[derive(Debug)]
+pub struct SyncPartition {
+ pub partition: Partition,
+ pub first_hash: Hash,
+ pub last_hash: Hash,
+ pub storage_nodes: Vec<Uuid>,
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 793d87fd..f02b1d66 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -51,7 +51,42 @@ impl TableReplication for TableShardedReplication {
fn partition_of(&self, hash: &Hash) -> Partition {
self.system.cluster_layout().current().partition_of(hash)
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- self.system.cluster_layout().current().partitions()
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.all_ack();
+
+ let mut partitions = layout
+ .current()
+ .partitions()
+ .map(|(partition, first_hash)| {
+ let mut storage_nodes = layout
+ .write_sets_of(&first_hash)
+ .map(|x| x.into_iter())
+ .flatten()
+ .collect::<Vec<_>>();
+ storage_nodes.sort();
+ storage_nodes.dedup();
+ SyncPartition {
+ partition,
+ first_hash,
+ last_hash: [0u8; 32].into(), // filled in just after
+ storage_nodes,
+ }
+ })
+ .collect::<Vec<_>>();
+
+ for i in 0..partitions.len() {
+ partitions[i].last_hash = if i + 1 < partitions.len() {
+ partitions[i + 1].first_hash
+ } else {
+ [0xFFu8; 32].into()
+ };
+ }
+
+ SyncPartitions {
+ layout_version,
+ partitions,
+ }
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 4355bd9e..43636faa 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -6,7 +6,7 @@ use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
-use rand::Rng;
+use rand::prelude::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::select;
@@ -52,16 +52,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
-#[derive(Debug, Clone)]
-struct TodoPartition {
- partition: Partition,
- begin: Hash,
- end: Hash,
-
- // Are we a node that stores this partition or not?
- retain: bool,
-}
-
impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
pub(crate) fn new(
system: Arc<System>,
@@ -92,9 +82,9 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
bg.spawn_worker(SyncWorker {
syncer: self.clone(),
layout_notify: self.system.layout_notify(),
- layout_version: self.system.cluster_layout().current().version,
+ layout_versions: self.system.cluster_layout().sync_versions(),
add_full_sync_rx,
- todo: vec![],
+ todo: None,
next_full_sync: Instant::now() + Duration::from_secs(20),
});
}
@@ -112,31 +102,26 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
async fn sync_partition(
self: &Arc<Self>,
- partition: &TodoPartition,
+ partition: &SyncPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
- if partition.retain {
- let my_id = self.system.id;
-
- let nodes = self
- .data
- .replication
- .write_nodes(&partition.begin)
- .into_iter()
- .filter(|node| *node != my_id)
- .collect::<Vec<_>>();
+ let my_id = self.system.id;
+ let retain = partition.storage_nodes.contains(&my_id);
+ if retain {
debug!(
"({}) Syncing {:?} with {:?}...",
F::TABLE_NAME,
partition,
- nodes
+ partition.storage_nodes
);
- let mut sync_futures = nodes
+ let mut sync_futures = partition
+ .storage_nodes
.iter()
+ .filter(|node| **node != my_id)
.map(|node| {
self.clone()
- .do_sync_with(partition.clone(), *node, must_exit.clone())
+ .do_sync_with(&partition, *node, must_exit.clone())
})
.collect::<FuturesUnordered<_>>();
@@ -147,14 +132,14 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
warn!("({}) Sync error: {}", F::TABLE_NAME, e);
}
}
- if n_errors > self.data.replication.max_write_errors() {
+ if n_errors > 0 {
return Err(Error::Message(format!(
- "Sync failed with too many nodes (should have been: {:?}).",
- nodes
+ "Sync failed with {} nodes.",
+ n_errors
)));
}
} else {
- self.offload_partition(&partition.begin, &partition.end, must_exit)
+ self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit)
.await?;
}
@@ -285,7 +270,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
async fn do_sync_with(
self: Arc<Self>,
- partition: TodoPartition,
+ partition: &SyncPartition,
who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
@@ -492,76 +477,23 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
+
layout_notify: Arc<Notify>,
- layout_version: u64,
+ layout_versions: (u64, u64, u64),
+
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
- todo: Vec<TodoPartition>,
next_full_sync: Instant,
+
+ todo: Option<SyncPartitions>,
}
impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
- let system = &self.syncer.system;
- let data = &self.syncer.data;
-
- let my_id = system.id;
-
- self.todo.clear();
-
- let partitions = data.replication.partitions();
-
- for i in 0..partitions.len() {
- let begin = partitions[i].1;
-
- let end = if i + 1 < partitions.len() {
- partitions[i + 1].1
- } else {
- [0xFFu8; 32].into()
- };
-
- let nodes = data.replication.write_nodes(&begin);
-
- let retain = nodes.contains(&my_id);
- if !retain {
- // Check if we have some data to send, otherwise skip
- match data.store.range(begin..end) {
- Ok(mut iter) => {
- if iter.next().is_none() {
- continue;
- }
- }
- Err(e) => {
- warn!("DB error in add_full_sync: {}", e);
- continue;
- }
- }
- }
-
- self.todo.push(TodoPartition {
- partition: partitions[i].0,
- begin,
- end,
- retain,
- });
- }
-
+ let mut partitions = self.syncer.data.replication.sync_partitions();
+ partitions.partitions.shuffle(&mut thread_rng());
+ self.todo = Some(partitions);
self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
-
- fn pop_task(&mut self) -> Option<TodoPartition> {
- if self.todo.is_empty() {
- return None;
- }
-
- let i = rand::thread_rng().gen_range(0..self.todo.len());
- if i == self.todo.len() - 1 {
- self.todo.pop()
- } else {
- let replacement = self.todo.pop().unwrap();
- let ret = std::mem::replace(&mut self.todo[i], replacement);
- Some(ret)
- }
- }
}
#[async_trait]
@@ -572,18 +504,46 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
- queue_length: Some(self.todo.len() as u64),
+ queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- if let Some(partition) = self.pop_task() {
- self.syncer.sync_partition(&partition, must_exit).await?;
- Ok(WorkerState::Busy)
- } else {
- Ok(WorkerState::Idle)
+ if let Some(todo) = &mut self.todo {
+ let partition = todo.partitions.pop().unwrap();
+
+ // process partition
+ if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await {
+ error!(
+ "{}: Failed to sync partition {:?}: {}",
+ F::TABLE_NAME,
+ partition,
+ e
+ );
+ // if error, put partition back at the other side of the queue,
+ // so that other partitions will be tried in the meantime
+ todo.partitions.insert(0, partition);
+ // TODO: returning an error here will cause the background job worker
+ // to delay this task for some time, but maybe we don't want to
+ // delay it if there are lots of failures from nodes that are gone
+ // (we also don't want zero delays as that will cause lots of useless retries)
+ return Err(e);
+ }
+
+ // done
+ if !todo.partitions.is_empty() {
+ return Ok(WorkerState::Busy);
+ }
+
+ self.syncer
+ .system
+ .layout_manager
+ .sync_table_until(F::TABLE_NAME, todo.layout_version);
}
+
+ self.todo = None;
+ Ok(WorkerState::Idle)
}
async fn wait_for_work(&mut self) -> WorkerState {
@@ -594,10 +554,16 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
}
},
_ = self.layout_notify.notified() => {
- let new_version = self.syncer.system.cluster_layout().current().version;
- if new_version > self.layout_version {
- self.layout_version = new_version;
- debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME);
+ let layout_versions = self.syncer.system.cluster_layout().sync_versions();
+ if layout_versions != self.layout_versions {
+ self.layout_versions = layout_versions;
+ debug!(
+ "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list",
+ F::TABLE_NAME,
+ layout_versions.0,
+ layout_versions.1,
+ layout_versions.2
+ );
self.add_full_sync();
}
},
@@ -605,9 +571,9 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
self.add_full_sync();
}
}
- match self.todo.is_empty() {
- false => WorkerState::Busy,
- true => WorkerState::Idle,
+ match self.todo.is_some() {
+ true => WorkerState::Busy,
+ false => WorkerState::Idle,
}
}
}