aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs3
-rw-r--r--src/table/gc.rs10
-rw-r--r--src/table/merkle.rs2
-rw-r--r--src/table/replication/fullcopy.rs40
-rw-r--r--src/table/replication/parameters.rs29
-rw-r--r--src/table/replication/sharded.rs51
-rw-r--r--src/table/sync.rs249
-rw-r--r--src/table/table.rs159
8 files changed, 338 insertions, 205 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index bbfdf58b..7f6b7847 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -254,7 +254,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
// of the GC algorithm, as in all cases GC is suspended if
// any node of the partition is unavailable.
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
- let nodes = self.replication.write_nodes(&pk_hash);
+ // TODO: this probably breaks when the layout changes
+ let nodes = self.replication.storage_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 5b9124a7..ef788749 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -152,7 +152,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
let mut partitions = HashMap::new();
for entry in entries {
let pkh = Hash::try_from(&entry.key[..32]).unwrap();
- let mut nodes = self.data.replication.write_nodes(&pkh);
+ let mut nodes = self.data.replication.storage_nodes(&pkh);
nodes.retain(|x| *x != self.system.id);
nodes.sort();
@@ -227,10 +227,10 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// GC'ing is not a critical function of the system, so it's not a big
// deal if we can't do it right now.
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &nodes[..],
+ &nodes,
GcRpc::Update(updates),
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
@@ -248,10 +248,10 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// it means that the garbage collection wasn't completed and has
// to be retried later.
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &nodes[..],
+ &nodes,
GcRpc::DeleteIfEqualHash(deletes),
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 4577f872..01271c58 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -13,7 +13,7 @@ use garage_util::data::*;
use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::Error;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use crate::data::*;
use crate::replication::*;
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index 18682ace..30122f39 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,15 +1,22 @@
use std::sync::Arc;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_util::data::*;
use crate::replication::*;
+// TODO: find a way to track layout changes for this as well
+// The hard thing is that this data is stored also on gateway nodes,
+// whereas sharded data is stored only on non-Gateway nodes (storage nodes)
+// Also we want to be more tolerant to failures of gateways so we don't
+// want to do too much holding back of data when progress of gateway
+// nodes is not reported in the layout history's ack/sync/sync_ack maps.
+
/// Full replication schema: all nodes store everything
-/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
+/// Inconvenient: if some writes fail, nodes will read outdated data
#[derive(Clone)]
pub struct TableFullReplication {
/// The membership manager of this node
@@ -19,6 +26,13 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
+ type WriteSets = Vec<Vec<Uuid>>;
+
+ fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ let layout = self.system.cluster_layout();
+ layout.current().all_nodes().to_vec()
+ }
+
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
@@ -26,12 +40,11 @@ impl TableReplication for TableFullReplication {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.layout.node_ids().to_vec()
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ vec![self.storage_nodes(hash)]
}
fn write_quorum(&self) -> usize {
- let nmembers = self.system.ring.borrow().layout.node_ids().len();
+ let nmembers = self.system.cluster_layout().current().all_nodes().len();
if nmembers > self.max_faults {
nmembers - self.max_faults
} else {
@@ -45,7 +58,18 @@ impl TableReplication for TableFullReplication {
fn partition_of(&self, _hash: &Hash) -> Partition {
0u16
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- vec![(0u16, [0u8; 32].into())]
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.current().version;
+ SyncPartitions {
+ layout_version,
+ partitions: vec![SyncPartition {
+ partition: 0u16,
+ first_hash: [0u8; 32].into(),
+ last_hash: [0xff; 32].into(),
+ storage_sets: vec![layout.current().all_nodes().to_vec()],
+ }],
+ }
}
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index f00815a2..78470f35 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -1,25 +1,44 @@
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync + 'static {
+ type WriteSets: AsRef<Vec<Vec<Uuid>>> + AsMut<Vec<Vec<Uuid>>> + Send + Sync + 'static;
+
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
+ /// The entire list of all nodes that store a partition
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid>;
+
/// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
- /// Responses needed to consider a write succesfull
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets;
+ /// Responses needed to consider a write succesfull in each set
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
- /// List of existing partitions
- fn partitions(&self) -> Vec<(Partition, Hash)>;
+ /// List of partitions and nodes to sync with in current layout
+ fn sync_partitions(&self) -> SyncPartitions;
+}
+
+#[derive(Debug)]
+pub struct SyncPartitions {
+ pub layout_version: u64,
+ pub partitions: Vec<SyncPartition>,
+}
+
+#[derive(Debug)]
+pub struct SyncPartition {
+ pub partition: Partition,
+ pub first_hash: Hash,
+ pub last_hash: Hash,
+ pub storage_sets: Vec<Vec<Uuid>>,
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 1cf964af..8ba3700f 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_util::data::*;
@@ -25,17 +25,21 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
+ type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
+
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ self.system.cluster_layout().storage_nodes_of(hash)
+ }
+
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.get_nodes(hash, self.replication_factor)
+ self.system.cluster_layout().read_nodes_of(hash)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let ring = self.system.ring.borrow();
- ring.get_nodes(hash, self.replication_factor)
+ fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
+ self.system.layout_manager.write_sets_of(hash)
}
fn write_quorum(&self) -> usize {
self.write_quorum
@@ -45,9 +49,38 @@ impl TableReplication for TableShardedReplication {
}
fn partition_of(&self, hash: &Hash) -> Partition {
- self.system.ring.borrow().partition_of(hash)
+ self.system.cluster_layout().current().partition_of(hash)
}
- fn partitions(&self) -> Vec<(Partition, Hash)> {
- self.system.ring.borrow().partitions()
+
+ fn sync_partitions(&self) -> SyncPartitions {
+ let layout = self.system.cluster_layout();
+ let layout_version = layout.ack_map_min();
+
+ let mut partitions = layout
+ .current()
+ .partitions()
+ .map(|(partition, first_hash)| {
+ let storage_sets = layout.storage_sets_of(&first_hash);
+ SyncPartition {
+ partition,
+ first_hash,
+ last_hash: [0u8; 32].into(), // filled in just after
+ storage_sets,
+ }
+ })
+ .collect::<Vec<_>>();
+
+ for i in 0..partitions.len() {
+ partitions[i].last_hash = if i + 1 < partitions.len() {
+ partitions[i + 1].first_hash
+ } else {
+ [0xFFu8; 32].into()
+ };
+ }
+
+ SyncPartitions {
+ layout_version,
+ partitions,
+ }
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 92a353c6..cd080df0 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -6,18 +6,19 @@ use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
-use rand::Rng;
+use rand::prelude::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::select;
-use tokio::sync::{mpsc, watch};
+use tokio::sync::{mpsc, watch, Notify};
use garage_util::background::*;
use garage_util::data::*;
use garage_util::encode::{debug_serialize, nonversioned_encode};
use garage_util::error::{Error, OkOrMessage};
-use garage_rpc::ring::*;
+use garage_rpc::layout::*;
+use garage_rpc::rpc_helper::QuorumSetResultTracker;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -52,16 +53,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
-#[derive(Debug, Clone)]
-struct TodoPartition {
- partition: Partition,
- begin: Hash,
- end: Hash,
-
- // Are we a node that stores this partition or not?
- retain: bool,
-}
-
impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
pub(crate) fn new(
system: Arc<System>,
@@ -91,10 +82,10 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
bg.spawn_worker(SyncWorker {
syncer: self.clone(),
- ring_recv: self.system.ring.clone(),
- ring: self.system.ring.borrow().clone(),
+ layout_notify: self.system.layout_notify(),
+ layout_digest: self.system.cluster_layout().sync_digest(),
add_full_sync_rx,
- todo: vec![],
+ todo: None,
next_full_sync: Instant::now() + Duration::from_secs(20),
});
}
@@ -112,53 +103,56 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
async fn sync_partition(
self: &Arc<Self>,
- partition: &TodoPartition,
+ partition: &SyncPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
- if partition.retain {
- let my_id = self.system.id;
-
- let nodes = self
- .data
- .replication
- .write_nodes(&partition.begin)
- .into_iter()
- .filter(|node| *node != my_id)
- .collect::<Vec<_>>();
+ let my_id = self.system.id;
+ let retain = partition.storage_sets.iter().any(|x| x.contains(&my_id));
+ if retain {
debug!(
"({}) Syncing {:?} with {:?}...",
F::TABLE_NAME,
partition,
- nodes
+ partition.storage_sets
);
- let mut sync_futures = nodes
- .iter()
+ let mut result_tracker = QuorumSetResultTracker::new(
+ &partition.storage_sets,
+ self.data.replication.write_quorum(),
+ );
+
+ let mut sync_futures = result_tracker
+ .nodes
+ .keys()
+ .copied()
.map(|node| {
- self.clone()
- .do_sync_with(partition.clone(), *node, must_exit.clone())
+ let must_exit = must_exit.clone();
+ async move {
+ if node == my_id {
+ (node, Ok(()))
+ } else {
+ (node, self.do_sync_with(partition, node, must_exit).await)
+ }
+ }
})
.collect::<FuturesUnordered<_>>();
- let mut n_errors = 0;
- while let Some(r) = sync_futures.next().await {
- if let Err(e) = r {
- n_errors += 1;
- warn!("({}) Sync error: {}", F::TABLE_NAME, e);
+ while let Some((node, res)) = sync_futures.next().await {
+ if let Err(e) = &res {
+ warn!("({}) Sync error with {:?}: {}", F::TABLE_NAME, node, e);
}
+ result_tracker.register_result(node, res);
}
- if n_errors > self.data.replication.max_write_errors() {
- return Err(Error::Message(format!(
- "Sync failed with too many nodes (should have been: {:?}).",
- nodes
- )));
+
+ if result_tracker.too_many_failures() {
+ Err(result_tracker.quorum_error())
+ } else {
+ Ok(())
}
} else {
- self.offload_partition(&partition.begin, &partition.end, must_exit)
- .await?;
+ self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit)
+ .await
}
-
- Ok(())
}
// Offload partition: this partition is not something we are storing,
@@ -188,12 +182,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
if !items.is_empty() {
- let nodes = self
- .data
- .replication
- .write_nodes(begin)
- .into_iter()
- .collect::<Vec<_>>();
+ let nodes = self.data.replication.storage_nodes(begin);
if nodes.contains(&self.system.id) {
warn!(
"({}) Interrupting offload as partitions seem to have changed",
@@ -217,7 +206,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
end,
counter
);
- self.offload_items(&items, &nodes[..]).await?;
+ self.offload_items(&items, &nodes).await?;
} else {
break;
}
@@ -244,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
nodes,
@@ -284,8 +273,8 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
async fn do_sync_with(
- self: Arc<Self>,
- partition: TodoPartition,
+ self: &Arc<Self>,
+ partition: &SyncPartition,
who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
@@ -305,7 +294,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// If so, do nothing.
let root_resp = self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -361,7 +350,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// and compare it with local node
let remote_node = match self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -437,7 +426,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
let rpc_resp = self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -492,75 +481,41 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
- ring_recv: watch::Receiver<Arc<Ring>>,
- ring: Arc<Ring>,
+
+ layout_notify: Arc<Notify>,
+ layout_digest: SyncLayoutDigest,
+
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
- todo: Vec<TodoPartition>,
next_full_sync: Instant,
+
+ todo: Option<SyncPartitions>,
}
impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
- fn add_full_sync(&mut self) {
- let system = &self.syncer.system;
- let data = &self.syncer.data;
-
- let my_id = system.id;
-
- self.todo.clear();
-
- let partitions = data.replication.partitions();
-
- for i in 0..partitions.len() {
- let begin = partitions[i].1;
-
- let end = if i + 1 < partitions.len() {
- partitions[i + 1].1
- } else {
- [0xFFu8; 32].into()
- };
-
- let nodes = data.replication.write_nodes(&begin);
-
- let retain = nodes.contains(&my_id);
- if !retain {
- // Check if we have some data to send, otherwise skip
- match data.store.range(begin..end) {
- Ok(mut iter) => {
- if iter.next().is_none() {
- continue;
- }
- }
- Err(e) => {
- warn!("DB error in add_full_sync: {}", e);
- continue;
- }
- }
- }
-
- self.todo.push(TodoPartition {
- partition: partitions[i].0,
- begin,
- end,
- retain,
- });
+ fn check_add_full_sync(&mut self) {
+ let layout_digest = self.syncer.system.cluster_layout().sync_digest();
+ if layout_digest != self.layout_digest {
+ self.layout_digest = layout_digest;
+ info!(
+ "({}) Layout versions changed ({:?}), adding full sync to syncer todo list",
+ F::TABLE_NAME,
+ layout_digest,
+ );
+ self.add_full_sync();
}
-
- self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
- fn pop_task(&mut self) -> Option<TodoPartition> {
- if self.todo.is_empty() {
- return None;
- }
+ fn add_full_sync(&mut self) {
+ let mut partitions = self.syncer.data.replication.sync_partitions();
+ info!(
+ "{}: Adding full sync for ack layout version {}",
+ F::TABLE_NAME,
+ partitions.layout_version
+ );
- let i = rand::thread_rng().gen_range(0..self.todo.len());
- if i == self.todo.len() - 1 {
- self.todo.pop()
- } else {
- let replacement = self.todo.pop().unwrap();
- let ret = std::mem::replace(&mut self.todo[i], replacement);
- Some(ret)
- }
+ partitions.partitions.shuffle(&mut thread_rng());
+ self.todo = Some(partitions);
+ self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
}
@@ -572,14 +527,48 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
- queue_length: Some(self.todo.len() as u64),
+ queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- if let Some(partition) = self.pop_task() {
- self.syncer.sync_partition(&partition, must_exit).await?;
+ self.check_add_full_sync();
+
+ if let Some(todo) = &mut self.todo {
+ let partition = todo.partitions.pop().unwrap();
+
+ // process partition
+ if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await {
+ error!(
+ "{}: Failed to sync partition {:?}: {}",
+ F::TABLE_NAME,
+ partition,
+ e
+ );
+ // if error, put partition back at the other side of the queue,
+ // so that other partitions will be tried in the meantime
+ todo.partitions.insert(0, partition);
+ // TODO: returning an error here will cause the background job worker
+ // to delay this task for some time, but maybe we don't want to
+ // delay it if there are lots of failures from nodes that are gone
+ // (we also don't want zero delays as that will cause lots of useless retries)
+ return Err(e);
+ }
+
+ if todo.partitions.is_empty() {
+ info!(
+ "{}: Completed full sync for ack layout version {}",
+ F::TABLE_NAME,
+ todo.layout_version
+ );
+ self.syncer
+ .system
+ .layout_manager
+ .sync_table_until(F::TABLE_NAME, todo.layout_version);
+ self.todo = None;
+ }
+
Ok(WorkerState::Busy)
} else {
Ok(WorkerState::Idle)
@@ -593,22 +582,16 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
self.add_full_sync();
}
},
- _ = self.ring_recv.changed() => {
- let new_ring = self.ring_recv.borrow();
- if !Arc::ptr_eq(&new_ring, &self.ring) {
- self.ring = new_ring.clone();
- drop(new_ring);
- debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- }
+ _ = self.layout_notify.notified() => {
+ self.check_add_full_sync();
},
_ = tokio::time::sleep_until(self.next_full_sync.into()) => {
self.add_full_sync();
}
}
- match self.todo.is_empty() {
- false => WorkerState::Busy,
- true => WorkerState::Idle,
+ match self.todo.is_some() {
+ true => WorkerState::Busy,
+ false => WorkerState::Idle,
}
}
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 7ad79677..a5be2910 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -20,6 +20,7 @@ use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use garage_util::migrate::Migrate;
+use garage_rpc::rpc_helper::QuorumSetResultTracker;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -80,6 +81,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
let gc = TableGc::new(system.clone(), data.clone());
+ system.layout_manager.add_table(F::TABLE_NAME);
+
let table = Arc::new(Self {
system,
data,
@@ -117,16 +120,16 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.data.replication.write_nodes(&hash);
+ let who = self.data.replication.write_sets(&hash);
let e_enc = Arc::new(ByteBuf::from(e.encode()?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.system
- .rpc
- .try_call_many(
+ .rpc_helper()
+ .try_write_many_sets(
&self.endpoint,
- &who[..],
+ who.as_ref(),
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.write_quorum()),
@@ -141,7 +144,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
self.data.queue_insert(tx, e)
}
- pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
+ pub async fn insert_many<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync,
@@ -159,51 +162,123 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
Ok(())
}
- async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error>
+ async fn insert_many_internal<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync,
{
- let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
-
+ // The different items will have to be stored on possibly different nodes.
+ // We will here batch all items into a single request for each concerned
+ // node, with all of the entries it must store within that request.
+ // Each entry has to be saved to a specific list of "write sets", i.e. a set
+ // of node within wich a quorum must be achieved. In normal operation, there
+ // is a single write set which corresponds to the quorum in the current
+ // cluster layout, but when the layout is updated, multiple write sets might
+ // have to be handled at once. Here, since we are sending many entries, we
+ // will have to handle many write sets in all cases. The algorihtm is thus
+ // to send one request to each node with all the items it must save,
+ // and keep track of the OK responses within each write set: if for all sets
+ // a quorum of nodes has answered OK, then the insert has succeeded and
+ // consistency properties (read-after-write) are preserved.
+
+ let quorum = self.data.replication.write_quorum();
+
+ // Serialize all entries and compute the write sets for each of them.
+ // In the case of sharded table replication, this also takes an "ack lock"
+ // to the layout manager to avoid ack'ing newer versions which are not
+ // taken into account by writes in progress (the ack can happen later, once
+ // all writes that didn't take the new layout into account are finished).
+ // These locks are released when entries_vec is dropped, i.e. when this
+ // function returns.
+ let mut entries_vec = Vec::new();
for entry in entries.into_iter() {
let entry = entry.borrow();
let hash = entry.partition_key().hash();
- let who = self.data.replication.write_nodes(&hash);
+ let mut write_sets = self.data.replication.write_sets(&hash);
+ for set in write_sets.as_mut().iter_mut() {
+ // Sort nodes in each write sets to merge write sets with same
+ // nodes but in possibly different orders
+ set.sort();
+ }
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
- for node in who {
- call_list.entry(node).or_default().push(e_enc.clone());
+ entries_vec.push((write_sets, e_enc));
+ }
+
+ // Compute a deduplicated list of all of the write sets,
+ // and compute an index from each node to the position of the sets in which
+ // it takes part, to optimize the detection of a quorum.
+ let mut write_sets = entries_vec
+ .iter()
+ .flat_map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice()))
+ .collect::<Vec<&[Uuid]>>();
+ write_sets.sort();
+ write_sets.dedup();
+
+ let mut result_tracker = QuorumSetResultTracker::new(&write_sets, quorum);
+
+ // Build a map of all nodes to the entries that must be sent to that node.
+ let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new();
+ for (write_sets, entry_enc) in entries_vec.iter() {
+ for write_set in write_sets.as_ref().iter() {
+ for node in write_set.iter() {
+ let node_entries = call_list.entry(*node).or_default();
+ match node_entries.last() {
+ Some(x) if Arc::ptr_eq(x, entry_enc) => {
+ // skip if entry already in list to send to this node
+ // (could happen if node is in several write sets for this entry)
+ }
+ _ => {
+ node_entries.push(entry_enc.clone());
+ }
+ }
+ }
}
}
- let call_futures = call_list.drain().map(|(node, entries)| async move {
- let rpc = TableRpc::<F>::Update(entries);
-
- let resp = self
- .system
- .rpc
- .call(
- &self.endpoint,
- node,
- rpc,
- RequestStrategy::with_priority(PRIO_NORMAL),
- )
- .await?;
- Ok::<_, Error>((node, resp))
+ // Build futures to actually perform each of the corresponding RPC calls
+ let call_futures = call_list.into_iter().map(|(node, entries)| {
+ let this = self.clone();
+ async move {
+ let rpc = TableRpc::<F>::Update(entries);
+ let resp = this
+ .system
+ .rpc_helper()
+ .call(
+ &this.endpoint,
+ node,
+ rpc,
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(quorum),
+ )
+ .await;
+ (node, resp)
+ }
});
+
+ // Run all requests in parallel thanks to FuturesUnordered, and collect results.
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
- let mut errors = vec![];
- while let Some(resp) = resps.next().await {
- if let Err(e) = resp {
- errors.push(e);
+ while let Some((node, resp)) = resps.next().await {
+ result_tracker.register_result(node, resp.map(|_| ()));
+
+ if result_tracker.all_quorums_ok() {
+ // Success
+
+ // Continue all other requests in background
+ tokio::spawn(async move {
+ resps.collect::<Vec<(Uuid, Result<_, _>)>>().await;
+ });
+
+ return Ok(());
+ }
+
+ if result_tracker.too_many_failures() {
+ // Too many errors in this set, we know we won't get a quorum
+ break;
}
}
- if errors.len() > self.data.replication.max_write_errors() {
- Err(Error::Message("Too many errors".into()))
- } else {
- Ok(())
- }
+
+ // Failure, could not get quorum within at least one set
+ Err(result_tracker.quorum_error())
}
pub async fn get(
@@ -236,14 +311,13 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &who[..],
+ &who,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.data.replication.read_quorum())
- .interrupt_after_quorum(true),
+ .with_quorum(self.data.replication.read_quorum()),
)
.await?;
@@ -332,14 +406,13 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let resps = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &who[..],
+ &who,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.data.replication.read_quorum())
- .interrupt_after_quorum(true),
+ .with_quorum(self.data.replication.read_quorum()),
)
.await?;
@@ -411,7 +484,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(what.encode()?));
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
who,