diff options
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 97 |
1 files changed, 59 insertions, 38 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index a3afbbba..c5db0987 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use async_trait::async_trait; use futures::select; use futures_util::future::*; use futures_util::stream::*; @@ -13,10 +14,9 @@ use tokio::sync::{mpsc, watch}; use garage_util::data::*; use garage_util::error::Error; -use garage_rpc::membership::System; use garage_rpc::ring::*; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; +use garage_rpc::system::System; +use garage_rpc::*; use crate::data::*; use crate::merkle::*; @@ -28,13 +28,13 @@ const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer<F: TableSchema, R: TableReplication> { +pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> { system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, todo: Mutex<SyncTodo>, - rpc_client: Arc<RpcClient<SyncRpc>>, + endpoint: Arc<Endpoint<SyncRpc, Self>>, } #[derive(Serialize, Deserialize)] @@ -45,9 +45,12 @@ pub(crate) enum SyncRpc { Node(MerkleNodeKey, MerkleNode), Items(Vec<Arc<ByteBuf>>), Ok, + Error(String), } -impl RpcMessage for SyncRpc {} +impl Message for SyncRpc { + type Response = SyncRpc; +} struct SyncTodo { todo: Vec<TodoPartition>, @@ -72,10 +75,10 @@ where system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, - rpc_server: &mut RpcServer, ) -> Arc<Self> { - let rpc_path = format!("table_{}/sync", data.name); - let rpc_client = system.rpc_client::<SyncRpc>(&rpc_path); + let endpoint = system + .netapp + .endpoint(format!("garage_table/sync.rs/Rpc:{}", data.name)); let todo = SyncTodo { todo: vec![] }; @@ -84,10 +87,10 @@ where data: data.clone(), merkle, todo: Mutex::new(todo), - rpc_client, + endpoint, }); - syncer.register_handler(rpc_server, rpc_path); + syncer.endpoint.set_handler(syncer.clone()); let (busy_tx, busy_rx) = mpsc::unbounded_channel(); @@ -112,21 +115,6 @@ where syncer } - fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::<SyncRpc, _, _>(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle_rpc(&msg).await } - }); - - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle_rpc(&msg).await } - }); - } - async fn watcher_task( self: Arc<Self>, mut must_exit: watch::Receiver<bool>, @@ -317,15 +305,19 @@ where async fn offload_items( self: &Arc<Self>, items: &[(Vec<u8>, Arc<ByteBuf>)], - nodes: &[Uuid], + nodes: &[NodeID], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, nodes, SyncRpc::Items(values), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_quorum(nodes.len()) + .with_timeout(TABLE_SYNC_RPC_TIMEOUT), ) .await?; @@ -362,7 +354,7 @@ where async fn do_sync_with( self: Arc<Self>, partition: TodoPartition, - who: Uuid, + who: NodeID, must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; @@ -378,11 +370,14 @@ where // Check if they have the same root checksum // If so, do nothing. let root_resp = self - .rpc_client + .system + .rpc .call( + &self.endpoint, who, SyncRpc::RootCkHash(partition.partition, root_ck_hash), - TABLE_SYNC_RPC_TIMEOUT, + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_timeout(TABLE_SYNC_RPC_TIMEOUT), ) .await?; @@ -430,8 +425,15 @@ where // Get Merkle node for this tree position at remote node // and compare it with local node let remote_node = match self - .rpc_client - .call(who, SyncRpc::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT) + .system + .rpc + .call( + &self.endpoint, + who, + SyncRpc::GetNode(key.clone()), + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + ) .await? { SyncRpc::Node(_, node) => node, @@ -478,7 +480,7 @@ where Ok(()) } - async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { + async fn send_items(&self, who: NodeID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", self.data.name, @@ -492,8 +494,15 @@ where .collect::<Vec<_>>(); let rpc_resp = self - .rpc_client - .call(who, SyncRpc::Items(values), TABLE_SYNC_RPC_TIMEOUT) + .system + .rpc + .call( + &self.endpoint, + who, + SyncRpc::Items(values), + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + ) .await?; if let SyncRpc::Ok = rpc_resp { Ok(()) @@ -506,7 +515,6 @@ where } // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { @@ -527,6 +535,19 @@ where } } +#[async_trait] +impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> SyncRpc { + self.handle_rpc(message) + .await + .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e))) + } +} + impl SyncTodo { fn add_full_sync<F: TableSchema, R: TableReplication>( &mut self, |