aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs97
1 files changed, 59 insertions, 38 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index a3afbbba..c5db0987 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
+use async_trait::async_trait;
use futures::select;
use futures_util::future::*;
use futures_util::stream::*;
@@ -13,10 +14,9 @@ use tokio::sync::{mpsc, watch};
use garage_util::data::*;
use garage_util::error::Error;
-use garage_rpc::membership::System;
use garage_rpc::ring::*;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use crate::data::*;
use crate::merkle::*;
@@ -28,13 +28,13 @@ const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
-pub struct TableSyncer<F: TableSchema, R: TableReplication> {
+pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
todo: Mutex<SyncTodo>,
- rpc_client: Arc<RpcClient<SyncRpc>>,
+ endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
#[derive(Serialize, Deserialize)]
@@ -45,9 +45,12 @@ pub(crate) enum SyncRpc {
Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>),
Ok,
+ Error(String),
}
-impl RpcMessage for SyncRpc {}
+impl Message for SyncRpc {
+ type Response = SyncRpc;
+}
struct SyncTodo {
todo: Vec<TodoPartition>,
@@ -72,10 +75,10 @@ where
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let rpc_path = format!("table_{}/sync", data.name);
- let rpc_client = system.rpc_client::<SyncRpc>(&rpc_path);
+ let endpoint = system
+ .netapp
+ .endpoint(format!("garage_table/sync.rs/Rpc:{}", data.name));
let todo = SyncTodo { todo: vec![] };
@@ -84,10 +87,10 @@ where
data: data.clone(),
merkle,
todo: Mutex::new(todo),
- rpc_client,
+ endpoint,
});
- syncer.register_handler(rpc_server, rpc_path);
+ syncer.endpoint.set_handler(syncer.clone());
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
@@ -112,21 +115,6 @@ where
syncer
}
- fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<SyncRpc, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
-
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
- }
-
async fn watcher_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
@@ -317,15 +305,19 @@ where
async fn offload_items(
self: &Arc<Self>,
items: &[(Vec<u8>, Arc<ByteBuf>)],
- nodes: &[Uuid],
+ nodes: &[NodeID],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
nodes,
SyncRpc::Items(values),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_quorum(nodes.len())
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
.await?;
@@ -362,7 +354,7 @@ where
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
- who: Uuid,
+ who: NodeID,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
@@ -378,11 +370,14 @@ where
// Check if they have the same root checksum
// If so, do nothing.
let root_resp = self
- .rpc_client
+ .system
+ .rpc
.call(
+ &self.endpoint,
who,
SyncRpc::RootCkHash(partition.partition, root_ck_hash),
- TABLE_SYNC_RPC_TIMEOUT,
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
.await?;
@@ -430,8 +425,15 @@ where
// Get Merkle node for this tree position at remote node
// and compare it with local node
let remote_node = match self
- .rpc_client
- .call(who, SyncRpc::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
+ .system
+ .rpc
+ .call(
+ &self.endpoint,
+ who,
+ SyncRpc::GetNode(key.clone()),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ )
.await?
{
SyncRpc::Node(_, node) => node,
@@ -478,7 +480,7 @@ where
Ok(())
}
- async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ async fn send_items(&self, who: NodeID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.data.name,
@@ -492,8 +494,15 @@ where
.collect::<Vec<_>>();
let rpc_resp = self
- .rpc_client
- .call(who, SyncRpc::Items(values), TABLE_SYNC_RPC_TIMEOUT)
+ .system
+ .rpc
+ .call(
+ &self.endpoint,
+ who,
+ SyncRpc::Items(values),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ )
.await?;
if let SyncRpc::Ok = rpc_resp {
Ok(())
@@ -506,7 +515,6 @@ where
}
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
-
async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
@@ -527,6 +535,19 @@ where
}
}
+#[async_trait]
+impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> SyncRpc {
+ self.handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e)))
+ }
+}
+
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,