aboutsummaryrefslogtreecommitdiff
path: root/src/table/gc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r--src/table/gc.rs80
1 files changed, 42 insertions, 38 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 73e08827..c03648ef 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -13,9 +14,8 @@ use tokio::sync::watch;
use garage_util::data::*;
use garage_util::error::Error;
-use garage_rpc::membership::System;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use crate::data::*;
use crate::replication::*;
@@ -24,11 +24,11 @@ use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-pub struct TableGc<F: TableSchema, R: TableReplication> {
+pub struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
- rpc_client: Arc<RpcClient<GcRpc>>,
+ endpoint: Arc<Endpoint<GcRpc, Self>>,
}
#[derive(Serialize, Deserialize)]
@@ -36,30 +36,30 @@ enum GcRpc {
Update(Vec<ByteBuf>),
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
Ok,
+ Error(String),
}
-impl RpcMessage for GcRpc {}
+impl Message for GcRpc {
+ type Response = GcRpc;
+}
impl<F, R> TableGc<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(
- system: Arc<System>,
- data: Arc<TableData<F, R>>,
- rpc_server: &mut RpcServer,
- ) -> Arc<Self> {
- let rpc_path = format!("table_{}/gc", data.name);
- let rpc_client = system.rpc_client::<GcRpc>(&rpc_path);
+ pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
+ let endpoint = system
+ .netapp
+ .endpoint(format!("garage_table/gc.rs/Rpc:{}", data.name));
let gc = Arc::new(Self {
system: system.clone(),
data: data.clone(),
- rpc_client,
+ endpoint,
});
- gc.register_handler(rpc_server, rpc_path);
+ gc.endpoint.set_handler(gc.clone());
let gc1 = gc.clone();
system.background.spawn_worker(
@@ -168,7 +168,7 @@ where
async fn try_send_and_delete(
&self,
- nodes: Vec<Uuid>,
+ nodes: Vec<NodeID>,
items: Vec<(ByteBuf, Hash, ByteBuf)>,
) -> Result<(), Error> {
let n_items = items.len();
@@ -180,11 +180,15 @@ where
deletes.push((k, vhash));
}
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&nodes[..],
GcRpc::Update(updates),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_quorum(nodes.len())
+ .with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
@@ -193,11 +197,15 @@ where
self.data.name, n_items
);
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&nodes[..],
GcRpc::DeleteIfEqualHash(deletes.clone()),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_quorum(nodes.len())
+ .with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
@@ -217,24 +225,7 @@ where
Ok(())
}
- // ---- RPC HANDLER ----
-
- fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<GcRpc, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
-
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
- }
-
- async fn handle_rpc(self: &Arc<Self>, message: &GcRpc) -> Result<GcRpc, Error> {
+ async fn handle_rpc(&self, message: &GcRpc) -> Result<GcRpc, Error> {
match message {
GcRpc::Update(items) => {
self.data.update_many(items)?;
@@ -251,3 +242,16 @@ where
}
}
}
+
+#[async_trait]
+impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> GcRpc {
+ self.handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| GcRpc::Error(format!("{}", e)))
+ }
+}