diff options
Diffstat (limited to 'src/table/gc.rs')
-rw-r--r-- | src/table/gc.rs | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index 2dcbcaa0..73e08827 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -24,23 +24,23 @@ use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub struct TableGC<F: TableSchema, R: TableReplication> { +pub struct TableGc<F: TableSchema, R: TableReplication> { system: Arc<System>, data: Arc<TableData<F, R>>, - rpc_client: Arc<RpcClient<GcRPC>>, + rpc_client: Arc<RpcClient<GcRpc>>, } #[derive(Serialize, Deserialize)] -enum GcRPC { +enum GcRpc { Update(Vec<ByteBuf>), DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), Ok, } -impl RpcMessage for GcRPC {} +impl RpcMessage for GcRpc {} -impl<F, R> TableGC<F, R> +impl<F, R> TableGc<F, R> where F: TableSchema + 'static, R: TableReplication + 'static, @@ -51,7 +51,7 @@ where rpc_server: &mut RpcServer, ) -> Arc<Self> { let rpc_path = format!("table_{}/gc", data.name); - let rpc_client = system.rpc_client::<GcRPC>(&rpc_path); + let rpc_client = system.rpc_client::<GcRpc>(&rpc_path); let gc = Arc::new(Self { system: system.clone(), @@ -168,7 +168,7 @@ where async fn try_send_and_delete( &self, - nodes: Vec<UUID>, + nodes: Vec<Uuid>, items: Vec<(ByteBuf, Hash, ByteBuf)>, ) -> Result<(), Error> { let n_items = items.len(); @@ -183,7 +183,7 @@ where self.rpc_client .try_call_many( &nodes[..], - GcRPC::Update(updates), + GcRpc::Update(updates), RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), ) .await?; @@ -196,7 +196,7 @@ where self.rpc_client .try_call_many( &nodes[..], - GcRPC::DeleteIfEqualHash(deletes.clone()), + GcRpc::DeleteIfEqualHash(deletes.clone()), RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), ) .await?; @@ -221,7 +221,7 @@ where fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) { let self2 = self.clone(); - rpc_server.add_handler::<GcRPC, _, _>(path, move |msg, _addr| { + rpc_server.add_handler::<GcRpc, _, _>(path, move |msg, _addr| { let self2 = self2.clone(); async move { self2.handle_rpc(&msg).await } }); @@ -234,18 +234,18 @@ where }); } - async fn handle_rpc(self: &Arc<Self>, message: &GcRPC) -> Result<GcRPC, Error> { + async fn handle_rpc(self: &Arc<Self>, message: &GcRpc) -> Result<GcRpc, Error> { match message { - GcRPC::Update(items) => { + GcRpc::Update(items) => { self.data.update_many(items)?; - Ok(GcRPC::Ok) + Ok(GcRpc::Ok) } - GcRPC::DeleteIfEqualHash(items) => { + GcRpc::DeleteIfEqualHash(items) => { for (key, vhash) in items.iter() { self.data.delete_if_equal_hash(&key[..], *vhash)?; self.todo_remove_if_equal(&key[..], *vhash)?; } - Ok(GcRPC::Ok) + Ok(GcRpc::Ok) } _ => Err(Error::Message("Unexpected GC RPC".to_string())), } |