diff options
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r-- | src/table/gc.rs | 80 |
1 files changed, 42 insertions, 38 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index 73e08827..c03648ef 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -13,9 +14,8 @@ use tokio::sync::watch; use garage_util::data::*; use garage_util::error::Error; -use garage_rpc::membership::System; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; +use garage_rpc::system::System; +use garage_rpc::*; use crate::data::*; use crate::replication::*; @@ -24,11 +24,11 @@ use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub struct TableGc<F: TableSchema, R: TableReplication> { +pub struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { system: Arc<System>, data: Arc<TableData<F, R>>, - rpc_client: Arc<RpcClient<GcRpc>>, + endpoint: Arc<Endpoint<GcRpc, Self>>, } #[derive(Serialize, Deserialize)] @@ -36,30 +36,30 @@ enum GcRpc { Update(Vec<ByteBuf>), DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), Ok, + Error(String), } -impl RpcMessage for GcRpc {} +impl Message for GcRpc { + type Response = GcRpc; +} impl<F, R> TableGc<F, R> where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch( - system: Arc<System>, - data: Arc<TableData<F, R>>, - rpc_server: &mut RpcServer, - ) -> Arc<Self> { - let rpc_path = format!("table_{}/gc", data.name); - let rpc_client = system.rpc_client::<GcRpc>(&rpc_path); + pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { + let endpoint = system + .netapp + .endpoint(format!("garage_table/gc.rs/Rpc:{}", data.name)); let gc = Arc::new(Self { system: system.clone(), data: data.clone(), - rpc_client, + endpoint, }); - gc.register_handler(rpc_server, rpc_path); + gc.endpoint.set_handler(gc.clone()); let gc1 = gc.clone(); system.background.spawn_worker( @@ -168,7 +168,7 @@ where async fn try_send_and_delete( &self, - nodes: Vec<Uuid>, + nodes: Vec<NodeID>, items: Vec<(ByteBuf, Hash, ByteBuf)>, ) -> Result<(), Error> { let n_items = items.len(); @@ -180,11 +180,15 @@ where deletes.push((k, vhash)); } - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, &nodes[..], GcRpc::Update(updates), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_quorum(nodes.len()) + .with_timeout(TABLE_GC_RPC_TIMEOUT), ) .await?; @@ -193,11 +197,15 @@ where self.data.name, n_items ); - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, &nodes[..], GcRpc::DeleteIfEqualHash(deletes.clone()), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_quorum(nodes.len()) + .with_timeout(TABLE_GC_RPC_TIMEOUT), ) .await?; @@ -217,24 +225,7 @@ where Ok(()) } - // ---- RPC HANDLER ---- - - fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::<GcRpc, _, _>(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle_rpc(&msg).await } - }); - - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle_rpc(&msg).await } - }); - } - - async fn handle_rpc(self: &Arc<Self>, message: &GcRpc) -> Result<GcRpc, Error> { + async fn handle_rpc(&self, message: &GcRpc) -> Result<GcRpc, Error> { match message { GcRpc::Update(items) => { self.data.update_many(items)?; @@ -251,3 +242,16 @@ where } } } + +#[async_trait] +impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> GcRpc { + self.handle_rpc(message) + .await + .unwrap_or_else(|e| GcRpc::Error(format!("{}", e))) + } +} |