aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs58
1 files changed, 31 insertions, 27 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index ac0305e2..9c148393 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch};
use garage_util::data::*;
use garage_util::error::Error;
+use garage_rpc::membership::System;
use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
@@ -29,8 +30,9 @@ const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
- data: Arc<TableData<F>>,
- aux: Arc<TableAux<R>>,
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+ merkle: Arc<MerkleUpdater<F, R>>,
todo: Mutex<SyncTodo>,
rpc_client: Arc<RpcClient<SyncRPC>>,
@@ -76,18 +78,20 @@ where
R: TableReplication + 'static,
{
pub(crate) fn launch(
- data: Arc<TableData<F>>,
- aux: Arc<TableAux<R>>,
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+ merkle: Arc<MerkleUpdater<F, R>>,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}/sync", data.name);
- let rpc_client = aux.system.rpc_client::<SyncRPC>(&rpc_path);
+ let rpc_client = system.rpc_client::<SyncRPC>(&rpc_path);
let todo = SyncTodo { todo: vec![] };
let syncer = Arc::new(Self {
+ system: system.clone(),
data: data.clone(),
- aux: aux.clone(),
+ merkle,
todo: Mutex::new(todo),
rpc_client,
});
@@ -97,13 +101,13 @@ where
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone();
- aux.system.background.spawn_worker(
+ system.background.spawn_worker(
format!("table sync watcher for {}", data.name),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
);
let s2 = syncer.clone();
- aux.system.background.spawn_worker(
+ system.background.spawn_worker(
format!("table syncer for {}", data.name),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
);
@@ -126,7 +130,7 @@ where
let self2 = self.clone();
self.rpc_client
- .set_local_handler(self.aux.system.id, move |msg| {
+ .set_local_handler(self.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
@@ -137,8 +141,8 @@ where
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
) {
- let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
+ let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
@@ -178,7 +182,7 @@ where
self.todo
.lock()
.unwrap()
- .add_full_sync(&self.data, &self.aux);
+ .add_full_sync(&self.data, &self.system);
}
async fn syncer_task(
@@ -213,10 +217,10 @@ where
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
if partition.retain {
- let my_id = self.aux.system.id;
+ let my_id = self.system.id;
let nodes = self
- .aux
+ .data
.replication
.write_nodes(&hash_of_merkle_partition(partition.range.begin))
.into_iter()
@@ -242,7 +246,7 @@ where
warn!("({}) Sync error: {}", self.data.name, e);
}
}
- if n_errors > self.aux.replication.max_write_errors() {
+ if n_errors > self.data.replication.max_write_errors() {
return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).",
nodes
@@ -288,19 +292,19 @@ where
if items.len() > 0 {
let nodes = self
- .aux
+ .data
.replication
.write_nodes(&begin)
.into_iter()
.collect::<Vec<_>>();
- if nodes.contains(&self.aux.system.id) {
+ if nodes.contains(&self.system.id) {
warn!(
"({}) Interrupting offload as partitions seem to have changed",
self.data.name
);
break;
}
- if nodes.len() < self.aux.replication.write_quorum() {
+ if nodes.len() < self.data.replication.write_quorum() {
return Err(Error::Message(format!(
"Not offloading as we don't have a quorum of nodes to write to."
)));
@@ -376,7 +380,7 @@ where
partition: u16::to_be_bytes(i),
prefix: vec![],
};
- match self.data.merkle_updater.read_node(&key)? {
+ match self.merkle.read_node(&key)? {
MerkleNode::Empty => (),
x => {
ret.push((key.partition, hash_of(&x)?));
@@ -458,7 +462,7 @@ where
while !todo.is_empty() && !*must_exit.borrow() {
let key = todo.pop_front().unwrap();
- let node = self.data.merkle_updater.read_node(&key)?;
+ let node = self.merkle.read_node(&key)?;
match node {
MerkleNode::Empty => {
@@ -570,7 +574,7 @@ where
}
}
SyncRPC::GetNode(k) => {
- let node = self.data.merkle_updater.read_node(&k)?;
+ let node = self.merkle.read_node(&k)?;
Ok(SyncRPC::Node(k.clone(), node))
}
SyncRPC::Items(items) => {
@@ -585,15 +589,15 @@ where
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,
- data: &TableData<F>,
- aux: &TableAux<R>,
+ data: &TableData<F, R>,
+ system: &System,
) {
- let my_id = aux.system.id;
+ let my_id = system.id;
self.todo.clear();
- let ring = aux.system.ring.borrow().clone();
- let split_points = aux.replication.split_points(&ring);
+ let ring = system.ring.borrow().clone();
+ let split_points = data.replication.split_points(&ring);
for i in 0..split_points.len() {
let begin: MerklePartition = {
@@ -613,7 +617,7 @@ impl SyncTodo {
let begin_hash = hash_of_merkle_partition(begin);
let end_hash = hash_of_merkle_partition_opt(end);
- let nodes = aux.replication.write_nodes(&begin_hash);
+ let nodes = data.replication.write_nodes(&begin_hash);
let retain = nodes.contains(&my_id);
if !retain {