aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs81
1 files changed, 58 insertions, 23 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 049a16ae..23161d15 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -12,10 +12,13 @@ use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::sync::{mpsc, watch};
-use garage_rpc::ring::Ring;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_rpc::ring::Ring;
+use garage_rpc::rpc_client::*;
+use garage_rpc::rpc_server::*;
+
use crate::data::*;
use crate::merkle::*;
use crate::replication::*;
@@ -31,6 +34,7 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
aux: Arc<TableAux<F, R>>,
todo: Mutex<SyncTodo>,
+ rpc_client: Arc<RpcClient<SyncRPC>>,
}
type RootCk = Vec<(MerklePartition, Hash)>;
@@ -49,8 +53,12 @@ pub(crate) enum SyncRPC {
CkNoDifference,
GetNode(MerkleNodeKey),
Node(MerkleNodeKey, MerkleNode),
+ Items(Vec<Arc<ByteBuf>>),
+ Ok,
}
+impl RpcMessage for SyncRPC {}
+
struct SyncTodo {
todo: Vec<TodoPartition>,
}
@@ -68,15 +76,25 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(data: Arc<TableData<F>>, aux: Arc<TableAux<F, R>>) -> Arc<Self> {
+ pub(crate) fn launch(
+ data: Arc<TableData<F>>,
+ aux: Arc<TableAux<F, R>>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
+ let rpc_path = format!("table_{}/sync", data.name);
+ let rpc_client = aux.system.rpc_client::<SyncRPC>(&rpc_path);
+
let todo = SyncTodo { todo: vec![] };
let syncer = Arc::new(Self {
data: data.clone(),
aux: aux.clone(),
todo: Mutex::new(todo),
+ rpc_client,
});
+ syncer.register_handler(rpc_server, rpc_path);
+
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone();
@@ -100,6 +118,21 @@ where
syncer
}
+ fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ let self2 = self.clone();
+ rpc_server.add_handler::<SyncRPC, _, _>(path, move |msg, _addr| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+
+ let self2 = self.clone();
+ self.rpc_client
+ .set_local_handler(self.aux.system.id, move |msg| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+ }
+
async fn watcher_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
@@ -278,11 +311,16 @@ where
.into_iter()
.collect::<Vec<_>>();
if nodes.contains(&self.aux.system.id) {
- warn!("({}) Interrupting offload as partitions seem to have changed", self.data.name);
+ warn!(
+ "({}) Interrupting offload as partitions seem to have changed",
+ self.data.name
+ );
break;
}
if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) {
- return Err(Error::Message(format!("Not offloading as we don't have a quorum of nodes to write to.")));
+ return Err(Error::Message(format!(
+ "Not offloading as we don't have a quorum of nodes to write to."
+ )));
}
counter += 1;
@@ -309,11 +347,10 @@ where
nodes: &[UUID],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- let update_msg = Arc::new(TableRPC::<F>::Update(values));
+ let update_msg = Arc::new(SyncRPC::Items(values));
for res in join_all(nodes.iter().map(|to| {
- self.aux
- .rpc_client
+ self.rpc_client
.call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
}))
.await
@@ -380,31 +417,30 @@ where
"({}) Sync {:?} with {:?}: partition is empty.",
self.data.name, partition, who
);
- return Ok(())
+ return Ok(());
}
let root_ck_hash = hash_of(&root_ck)?;
// If their root checksum has level > than us, use that as a reference
let root_resp = self
- .aux
.rpc_client
.call(
who,
- TableRPC::<F>::SyncRPC(SyncRPC::RootCkHash(partition.range, root_ck_hash)),
+ SyncRPC::RootCkHash(partition.range, root_ck_hash),
TABLE_SYNC_RPC_TIMEOUT,
)
.await?;
let mut todo = match root_resp {
- TableRPC::<F>::SyncRPC(SyncRPC::CkNoDifference) => {
+ SyncRPC::CkNoDifference => {
debug!(
"({}) Sync {:?} with {:?}: no difference",
self.data.name, partition, who
);
return Ok(());
}
- TableRPC::<F>::SyncRPC(SyncRPC::RootCkList(_, their_root_ck)) => {
+ SyncRPC::RootCkList(_, their_root_ck) => {
let join = join_ordered(&root_ck[..], &their_root_ck[..]);
let mut todo = VecDeque::new();
for (p, v1, v2) in join.iter() {
@@ -464,16 +500,11 @@ where
// Get Merkle node for this tree position at remote node
// and compare it with local node
let remote_node = match self
- .aux
.rpc_client
- .call(
- who,
- TableRPC::<F>::SyncRPC(SyncRPC::GetNode(key.clone())),
- TABLE_SYNC_RPC_TIMEOUT,
- )
+ .call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
.await?
{
- TableRPC::<F>::SyncRPC(SyncRPC::Node(_, node)) => node,
+ SyncRPC::Node(_, node) => node,
x => {
return Err(Error::Message(format!(
"Invalid respone to GetNode RPC: {}",
@@ -525,16 +556,16 @@ where
who
);
- let values = item_value_list.into_iter()
+ let values = item_value_list
+ .into_iter()
.map(|x| Arc::new(ByteBuf::from(x)))
.collect::<Vec<_>>();
let rpc_resp = self
- .aux
.rpc_client
- .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
+ .call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT)
.await?;
- if let TableRPC::<F>::Ok = rpc_resp {
+ if let SyncRPC::Ok = rpc_resp {
Ok(())
} else {
Err(Error::Message(format!(
@@ -561,6 +592,10 @@ where
let node = self.data.merkle_updater.read_node(&k)?;
Ok(SyncRPC::Node(k.clone(), node))
}
+ SyncRPC::Items(items) => {
+ self.data.update_many(items)?;
+ Ok(SyncRPC::Ok)
+ }
_ => Err(Error::Message(format!("Unexpected sync RPC"))),
}
}