diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 46 |
1 files changed, 23 insertions, 23 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 833d5771..eb9bd25c 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -13,7 +13,7 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use crate::crdt::CRDT; +use crate::crdt::Crdt; use crate::data::*; use crate::gc::*; use crate::merkle::*; @@ -28,11 +28,11 @@ pub struct Table<F: TableSchema, R: TableReplication> { pub data: Arc<TableData<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>, pub syncer: Arc<TableSyncer<F, R>>, - rpc_client: Arc<RpcClient<TableRPC<F>>>, + rpc_client: Arc<RpcClient<TableRpc<F>>>, } #[derive(Serialize, Deserialize)] -pub(crate) enum TableRPC<F: TableSchema> { +pub(crate) enum TableRpc<F: TableSchema> { Ok, ReadEntry(F::P, F::S), @@ -44,7 +44,7 @@ pub(crate) enum TableRPC<F: TableSchema> { Update(Vec<Arc<ByteBuf>>), } -impl<F: TableSchema> RpcMessage for TableRPC<F> {} +impl<F: TableSchema> RpcMessage for TableRpc<F> {} impl<F, R> Table<F, R> where @@ -62,7 +62,7 @@ where rpc_server: &mut RpcServer, ) -> Arc<Self> { let rpc_path = format!("table_{}", name); - let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); + let rpc_client = system.rpc_client::<TableRpc<F>>(&rpc_path); let data = TableData::new(system.clone(), name, instance, replication, db); @@ -74,7 +74,7 @@ where merkle_updater.clone(), rpc_server, ); - TableGC::launch(system.clone(), data.clone(), rpc_server); + TableGc::launch(system.clone(), data.clone(), rpc_server); let table = Arc::new(Self { system, @@ -95,7 +95,7 @@ where //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); - let rpc = TableRPC::<F>::Update(vec![e_enc]); + let rpc = TableRpc::<F>::Update(vec![e_enc]); self.rpc_client .try_call_many( @@ -121,7 +121,7 @@ where } let call_futures = call_list.drain().map(|(node, entries)| async move { - let rpc = TableRPC::<F>::Update(entries); + let rpc = TableRpc::<F>::Update(entries); let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) @@ -150,7 +150,7 @@ where let who = self.data.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); - let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); + let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self .rpc_client .try_call_many( @@ -165,7 +165,7 @@ where let mut ret = None; let mut not_all_same = false; for resp in resps { - if let TableRPC::ReadEntryResponse(value) = resp { + if let TableRpc::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { let v = self.data.decode_entry(v_bytes.as_slice())?; ret = match ret { @@ -205,7 +205,7 @@ where let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); - let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self .rpc_client @@ -221,7 +221,7 @@ where let mut ret = BTreeMap::new(); let mut to_repair = BTreeMap::new(); for resp in resps { - if let TableRPC::Update(entries) = resp { + if let TableRpc::Update(entries) = resp { for entry_bytes in entries.iter() { let entry = self.data.decode_entry(entry_bytes.as_slice())?; let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); @@ -261,12 +261,12 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== - async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { + async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.rpc_client .try_call_many( who, - TableRPC::<F>::Update(vec![what_enc]), + TableRpc::<F>::Update(vec![what_enc]), RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -277,7 +277,7 @@ where fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { let self2 = self.clone(); - rpc_server.add_handler::<TableRPC<F>, _, _>(path, move |msg, _addr| { + rpc_server.add_handler::<TableRpc<F>, _, _>(path, move |msg, _addr| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); @@ -290,21 +290,21 @@ where }); } - async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> { + async fn handle(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> { match msg { - TableRPC::ReadEntry(key, sort_key) => { + TableRpc::ReadEntry(key, sort_key) => { let value = self.data.read_entry(key, sort_key)?; - Ok(TableRPC::ReadEntryResponse(value)) + Ok(TableRpc::ReadEntryResponse(value)) } - TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { + TableRpc::ReadRange(key, begin_sort_key, filter, limit) => { let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; - Ok(TableRPC::Update(values)) + Ok(TableRpc::Update(values)) } - TableRPC::Update(pairs) => { + TableRpc::Update(pairs) => { self.data.update_many(pairs)?; - Ok(TableRPC::Ok) + Ok(TableRpc::Ok) } - _ => Err(Error::BadRPC("Unexpected table RPC".to_string())), + _ => Err(Error::BadRpc("Unexpected table RPC".to_string())), } } } |