aboutsummaryrefslogtreecommitdiff
path: root/src/table/gc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r--src/table/gc.rs30
1 files changed, 15 insertions, 15 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 2dcbcaa0..73e08827 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -24,23 +24,23 @@ use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-pub struct TableGC<F: TableSchema, R: TableReplication> {
+pub struct TableGc<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
- rpc_client: Arc<RpcClient<GcRPC>>,
+ rpc_client: Arc<RpcClient<GcRpc>>,
}
#[derive(Serialize, Deserialize)]
-enum GcRPC {
+enum GcRpc {
Update(Vec<ByteBuf>),
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
Ok,
}
-impl RpcMessage for GcRPC {}
+impl RpcMessage for GcRpc {}
-impl<F, R> TableGC<F, R>
+impl<F, R> TableGc<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
@@ -51,7 +51,7 @@ where
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}/gc", data.name);
- let rpc_client = system.rpc_client::<GcRPC>(&rpc_path);
+ let rpc_client = system.rpc_client::<GcRpc>(&rpc_path);
let gc = Arc::new(Self {
system: system.clone(),
@@ -168,7 +168,7 @@ where
async fn try_send_and_delete(
&self,
- nodes: Vec<UUID>,
+ nodes: Vec<Uuid>,
items: Vec<(ByteBuf, Hash, ByteBuf)>,
) -> Result<(), Error> {
let n_items = items.len();
@@ -183,7 +183,7 @@ where
self.rpc_client
.try_call_many(
&nodes[..],
- GcRPC::Update(updates),
+ GcRpc::Update(updates),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
@@ -196,7 +196,7 @@ where
self.rpc_client
.try_call_many(
&nodes[..],
- GcRPC::DeleteIfEqualHash(deletes.clone()),
+ GcRpc::DeleteIfEqualHash(deletes.clone()),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
@@ -221,7 +221,7 @@ where
fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
let self2 = self.clone();
- rpc_server.add_handler::<GcRPC, _, _>(path, move |msg, _addr| {
+ rpc_server.add_handler::<GcRpc, _, _>(path, move |msg, _addr| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
@@ -234,18 +234,18 @@ where
});
}
- async fn handle_rpc(self: &Arc<Self>, message: &GcRPC) -> Result<GcRPC, Error> {
+ async fn handle_rpc(self: &Arc<Self>, message: &GcRpc) -> Result<GcRpc, Error> {
match message {
- GcRPC::Update(items) => {
+ GcRpc::Update(items) => {
self.data.update_many(items)?;
- Ok(GcRPC::Ok)
+ Ok(GcRpc::Ok)
}
- GcRPC::DeleteIfEqualHash(items) => {
+ GcRpc::DeleteIfEqualHash(items) => {
for (key, vhash) in items.iter() {
self.data.delete_if_equal_hash(&key[..], *vhash)?;
self.todo_remove_if_equal(&key[..], *vhash)?;
}
- Ok(GcRPC::Ok)
+ Ok(GcRpc::Ok)
}
_ => Err(Error::Message("Unexpected GC RPC".to_string())),
}