diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 110 |
1 files changed, 64 insertions, 46 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index eb9bd25c..ad263343 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -9,9 +10,8 @@ use serde_bytes::ByteBuf; use garage_util::data::*; use garage_util::error::Error; -use garage_rpc::membership::System; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; +use garage_rpc::system::System; +use garage_rpc::*; use crate::crdt::Crdt; use crate::data::*; @@ -23,17 +23,18 @@ use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct Table<F: TableSchema, R: TableReplication> { +pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub system: Arc<System>, pub data: Arc<TableData<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>, pub syncer: Arc<TableSyncer<F, R>>, - rpc_client: Arc<RpcClient<TableRpc<F>>>, + endpoint: Arc<Endpoint<TableRpc<F>, Self>>, } #[derive(Serialize, Deserialize)] pub(crate) enum TableRpc<F: TableSchema> { Ok, + Error(String), ReadEntry(F::P, F::S), ReadEntryResponse(Option<ByteBuf>), @@ -44,7 +45,9 @@ pub(crate) enum TableRpc<F: TableSchema> { Update(Vec<Arc<ByteBuf>>), } -impl<F: TableSchema> RpcMessage for TableRpc<F> {} +impl<F: TableSchema> Message for TableRpc<F> { + type Response = TableRpc<F>; +} impl<F, R> Table<F, R> where @@ -59,32 +62,27 @@ where system: Arc<System>, db: &sled::Db, name: String, - rpc_server: &mut RpcServer, ) -> Arc<Self> { - let rpc_path = format!("table_{}", name); - let rpc_client = system.rpc_client::<TableRpc<F>>(&rpc_path); + let endpoint = system + .netapp + .endpoint(format!("garage_table/table.rs/Rpc:{}", name)); let data = TableData::new(system.clone(), name, instance, replication, db); let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); - let syncer = TableSyncer::launch( - system.clone(), - data.clone(), - merkle_updater.clone(), - rpc_server, - ); - TableGc::launch(system.clone(), data.clone(), rpc_server); + let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone()); + TableGc::launch(system.clone(), data.clone()); let table = Arc::new(Self { system, data, merkle_updater, syncer, - rpc_client, + endpoint, }); - table.clone().register_handler(rpc_server, rpc_path); + table.endpoint.set_handler(table.clone()); table } @@ -97,11 +95,14 @@ where let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRpc::<F>::Update(vec![e_enc]); - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, &who[..], rpc, - RequestStrategy::with_quorum(self.data.replication.write_quorum()) + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.data.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -123,7 +124,16 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRpc::<F>::Update(entries); - let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self + .system + .rpc + .call( + &self.endpoint, + node, + rpc, + RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT), + ) + .await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); @@ -152,11 +162,14 @@ where let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self - .rpc_client + .system + .rpc .try_call_many( + &self.endpoint, &who[..], rpc, - RequestStrategy::with_quorum(self.data.replication.read_quorum()) + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -208,11 +221,14 @@ where let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .rpc_client + .system + .rpc .try_call_many( + &self.endpoint, &who[..], rpc, - RequestStrategy::with_quorum(self.data.replication.read_quorum()) + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -261,36 +277,25 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== - async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { + async fn repair_on_read(&self, who: &[NodeID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, who, TableRpc::<F>::Update(vec![what_enc]), - RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(who.len()) + .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; Ok(()) } - // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== - - fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::<TableRpc<F>, _, _>(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); - - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); - } - - async fn handle(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> { + // ====== RPC HANDLER ===== + // + async fn handle_rpc(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> { match msg { TableRpc::ReadEntry(key, sort_key) => { let value = self.data.read_entry(key, sort_key)?; @@ -308,3 +313,16 @@ where } } } + +#[async_trait] +impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc<Self>, msg: &TableRpc<F>, _from: NodeID) -> TableRpc<F> { + self.handle_rpc(msg) + .await + .unwrap_or_else(|e| TableRpc::<F>::Error(format!("{}", e))) + } +} |