diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/Cargo.toml | 8 | ||||
-rw-r--r-- | src/table/data.rs | 2 | ||||
-rw-r--r-- | src/table/gc.rs | 80 | ||||
-rw-r--r-- | src/table/replication/fullcopy.rs | 13 | ||||
-rw-r--r-- | src/table/replication/parameters.rs | 6 | ||||
-rw-r--r-- | src/table/replication/sharded.rs | 7 | ||||
-rw-r--r-- | src/table/sync.rs | 97 | ||||
-rw-r--r-- | src/table/table.rs | 110 |
8 files changed, 186 insertions, 137 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index ccbd1748..616bf275 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_table" -version = "0.3.0" +version = "0.4.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -13,9 +13,10 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_rpc = { version = "0.3.0", path = "../rpc" } -garage_util = { version = "0.3.0", path = "../util" } +garage_rpc = { version = "0.4.0", path = "../rpc" } +garage_util = { version = "0.4.0", path = "../util" } +async-trait = "0.1.7" bytes = "1.0" hexdump = "0.1" log = "0.4" @@ -30,4 +31,3 @@ serde_bytes = "0.11" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } - diff --git a/src/table/data.rs b/src/table/data.rs index e7e85e65..ffd494d5 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -9,7 +9,7 @@ use tokio::sync::Notify; use garage_util::data::*; use garage_util::error::*; -use garage_rpc::membership::System; +use garage_rpc::system::System; use crate::crdt::Crdt; use crate::replication::*; diff --git a/src/table/gc.rs b/src/table/gc.rs index 73e08827..c03648ef 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -13,9 +14,8 @@ use tokio::sync::watch; use garage_util::data::*; use garage_util::error::Error; -use garage_rpc::membership::System; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; +use garage_rpc::system::System; +use garage_rpc::*; use crate::data::*; use crate::replication::*; @@ -24,11 +24,11 @@ use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub struct TableGc<F: TableSchema, R: TableReplication> { +pub struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { system: Arc<System>, data: Arc<TableData<F, R>>, - rpc_client: Arc<RpcClient<GcRpc>>, + endpoint: Arc<Endpoint<GcRpc, Self>>, } #[derive(Serialize, Deserialize)] @@ -36,30 +36,30 @@ enum GcRpc { Update(Vec<ByteBuf>), DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), Ok, + Error(String), } -impl RpcMessage for GcRpc {} +impl Message for GcRpc { + type Response = GcRpc; +} impl<F, R> TableGc<F, R> where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch( - system: Arc<System>, - data: Arc<TableData<F, R>>, - rpc_server: &mut RpcServer, - ) -> Arc<Self> { - let rpc_path = format!("table_{}/gc", data.name); - let rpc_client = system.rpc_client::<GcRpc>(&rpc_path); + pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { + let endpoint = system + .netapp + .endpoint(format!("garage_table/gc.rs/Rpc:{}", data.name)); let gc = Arc::new(Self { system: system.clone(), data: data.clone(), - rpc_client, + endpoint, }); - gc.register_handler(rpc_server, rpc_path); + gc.endpoint.set_handler(gc.clone()); let gc1 = gc.clone(); system.background.spawn_worker( @@ -168,7 +168,7 @@ where async fn try_send_and_delete( &self, - nodes: Vec<Uuid>, + nodes: Vec<NodeID>, items: Vec<(ByteBuf, Hash, ByteBuf)>, ) -> Result<(), Error> { let n_items = items.len(); @@ -180,11 +180,15 @@ where deletes.push((k, vhash)); } - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, &nodes[..], GcRpc::Update(updates), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_quorum(nodes.len()) + .with_timeout(TABLE_GC_RPC_TIMEOUT), ) .await?; @@ -193,11 +197,15 @@ where self.data.name, n_items ); - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, &nodes[..], GcRpc::DeleteIfEqualHash(deletes.clone()), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_quorum(nodes.len()) + .with_timeout(TABLE_GC_RPC_TIMEOUT), ) .await?; @@ -217,24 +225,7 @@ where Ok(()) } - // ---- RPC HANDLER ---- - - fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::<GcRpc, _, _>(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle_rpc(&msg).await } - }); - - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle_rpc(&msg).await } - }); - } - - async fn handle_rpc(self: &Arc<Self>, message: &GcRpc) -> Result<GcRpc, Error> { + async fn handle_rpc(&self, message: &GcRpc) -> Result<GcRpc, Error> { match message { GcRpc::Update(items) => { self.data.update_many(items)?; @@ -251,3 +242,16 @@ where } } } + +#[async_trait] +impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> GcRpc { + self.handle_rpc(message) + .await + .unwrap_or_else(|e| GcRpc::Error(format!("{}", e))) + } +} diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 3ce7c0bf..b41c5360 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -1,7 +1,8 @@ use std::sync::Arc; -use garage_rpc::membership::System; use garage_rpc::ring::*; +use garage_rpc::system::System; +use garage_rpc::NodeID; use garage_util::data::*; use crate::replication::*; @@ -19,16 +20,20 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { - fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> { + fn read_nodes(&self, _hash: &Hash) -> Vec<NodeID> { vec![self.system.id] } fn read_quorum(&self) -> usize { 1 } - fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> { + fn write_nodes(&self, _hash: &Hash) -> Vec<NodeID> { let ring = self.system.ring.borrow(); - ring.config.members.keys().cloned().collect::<Vec<_>>() + ring.config + .members + .keys() + .map(|id| NodeID::from_slice(id.as_slice()).unwrap()) + .collect::<Vec<_>>() } fn write_quorum(&self) -> usize { let nmembers = self.system.ring.borrow().config.members.len(); diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 64996828..7fdfce67 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -1,5 +1,5 @@ use garage_rpc::ring::*; - +use garage_rpc::NodeID; use garage_util::data::*; /// Trait to describe how a table shall be replicated @@ -8,12 +8,12 @@ pub trait TableReplication: Send + Sync { // To understand various replication methods /// Which nodes to send read requests to - fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>; + fn read_nodes(&self, hash: &Hash) -> Vec<NodeID>; /// Responses needed to consider a read succesfull fn read_quorum(&self) -> usize; /// Which nodes to send writes to - fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>; + fn write_nodes(&self, hash: &Hash) -> Vec<NodeID>; /// Responses needed to consider a write succesfull fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 8081b892..ffe686a5 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,7 +1,8 @@ use std::sync::Arc; -use garage_rpc::membership::System; use garage_rpc::ring::*; +use garage_rpc::system::System; +use garage_rpc::NodeID; use garage_util::data::*; use crate::replication::*; @@ -25,7 +26,7 @@ pub struct TableShardedReplication { } impl TableReplication for TableShardedReplication { - fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> { + fn read_nodes(&self, hash: &Hash) -> Vec<NodeID> { let ring = self.system.ring.borrow(); ring.get_nodes(&hash, self.replication_factor) } @@ -33,7 +34,7 @@ impl TableReplication for TableShardedReplication { self.read_quorum } - fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> { + fn write_nodes(&self, hash: &Hash) -> Vec<NodeID> { let ring = self.system.ring.borrow(); ring.get_nodes(&hash, self.replication_factor) } diff --git a/src/table/sync.rs b/src/table/sync.rs index a3afbbba..c5db0987 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use async_trait::async_trait; use futures::select; use futures_util::future::*; use futures_util::stream::*; @@ -13,10 +14,9 @@ use tokio::sync::{mpsc, watch}; use garage_util::data::*; use garage_util::error::Error; -use garage_rpc::membership::System; use garage_rpc::ring::*; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; +use garage_rpc::system::System; +use garage_rpc::*; use crate::data::*; use crate::merkle::*; @@ -28,13 +28,13 @@ const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer<F: TableSchema, R: TableReplication> { +pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> { system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, todo: Mutex<SyncTodo>, - rpc_client: Arc<RpcClient<SyncRpc>>, + endpoint: Arc<Endpoint<SyncRpc, Self>>, } #[derive(Serialize, Deserialize)] @@ -45,9 +45,12 @@ pub(crate) enum SyncRpc { Node(MerkleNodeKey, MerkleNode), Items(Vec<Arc<ByteBuf>>), Ok, + Error(String), } -impl RpcMessage for SyncRpc {} +impl Message for SyncRpc { + type Response = SyncRpc; +} struct SyncTodo { todo: Vec<TodoPartition>, @@ -72,10 +75,10 @@ where system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, - rpc_server: &mut RpcServer, ) -> Arc<Self> { - let rpc_path = format!("table_{}/sync", data.name); - let rpc_client = system.rpc_client::<SyncRpc>(&rpc_path); + let endpoint = system + .netapp + .endpoint(format!("garage_table/sync.rs/Rpc:{}", data.name)); let todo = SyncTodo { todo: vec![] }; @@ -84,10 +87,10 @@ where data: data.clone(), merkle, todo: Mutex::new(todo), - rpc_client, + endpoint, }); - syncer.register_handler(rpc_server, rpc_path); + syncer.endpoint.set_handler(syncer.clone()); let (busy_tx, busy_rx) = mpsc::unbounded_channel(); @@ -112,21 +115,6 @@ where syncer } - fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::<SyncRpc, _, _>(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle_rpc(&msg).await } - }); - - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle_rpc(&msg).await } - }); - } - async fn watcher_task( self: Arc<Self>, mut must_exit: watch::Receiver<bool>, @@ -317,15 +305,19 @@ where async fn offload_items( self: &Arc<Self>, items: &[(Vec<u8>, Arc<ByteBuf>)], - nodes: &[Uuid], + nodes: &[NodeID], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, nodes, SyncRpc::Items(values), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_quorum(nodes.len()) + .with_timeout(TABLE_SYNC_RPC_TIMEOUT), ) .await?; @@ -362,7 +354,7 @@ where async fn do_sync_with( self: Arc<Self>, partition: TodoPartition, - who: Uuid, + who: NodeID, must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; @@ -378,11 +370,14 @@ where // Check if they have the same root checksum // If so, do nothing. let root_resp = self - .rpc_client + .system + .rpc .call( + &self.endpoint, who, SyncRpc::RootCkHash(partition.partition, root_ck_hash), - TABLE_SYNC_RPC_TIMEOUT, + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_timeout(TABLE_SYNC_RPC_TIMEOUT), ) .await?; @@ -430,8 +425,15 @@ where // Get Merkle node for this tree position at remote node // and compare it with local node let remote_node = match self - .rpc_client - .call(who, SyncRpc::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT) + .system + .rpc + .call( + &self.endpoint, + who, + SyncRpc::GetNode(key.clone()), + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + ) .await? { SyncRpc::Node(_, node) => node, @@ -478,7 +480,7 @@ where Ok(()) } - async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { + async fn send_items(&self, who: NodeID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", self.data.name, @@ -492,8 +494,15 @@ where .collect::<Vec<_>>(); let rpc_resp = self - .rpc_client - .call(who, SyncRpc::Items(values), TABLE_SYNC_RPC_TIMEOUT) + .system + .rpc + .call( + &self.endpoint, + who, + SyncRpc::Items(values), + RequestStrategy::with_priority(PRIO_BACKGROUND) + .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + ) .await?; if let SyncRpc::Ok = rpc_resp { Ok(()) @@ -506,7 +515,6 @@ where } // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { @@ -527,6 +535,19 @@ where } } +#[async_trait] +impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> SyncRpc { + self.handle_rpc(message) + .await + .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e))) + } +} + impl SyncTodo { fn add_full_sync<F: TableSchema, R: TableReplication>( &mut self, diff --git a/src/table/table.rs b/src/table/table.rs index eb9bd25c..ad263343 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -9,9 +10,8 @@ use serde_bytes::ByteBuf; use garage_util::data::*; use garage_util::error::Error; -use garage_rpc::membership::System; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; +use garage_rpc::system::System; +use garage_rpc::*; use crate::crdt::Crdt; use crate::data::*; @@ -23,17 +23,18 @@ use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct Table<F: TableSchema, R: TableReplication> { +pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub system: Arc<System>, pub data: Arc<TableData<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>, pub syncer: Arc<TableSyncer<F, R>>, - rpc_client: Arc<RpcClient<TableRpc<F>>>, + endpoint: Arc<Endpoint<TableRpc<F>, Self>>, } #[derive(Serialize, Deserialize)] pub(crate) enum TableRpc<F: TableSchema> { Ok, + Error(String), ReadEntry(F::P, F::S), ReadEntryResponse(Option<ByteBuf>), @@ -44,7 +45,9 @@ pub(crate) enum TableRpc<F: TableSchema> { Update(Vec<Arc<ByteBuf>>), } -impl<F: TableSchema> RpcMessage for TableRpc<F> {} +impl<F: TableSchema> Message for TableRpc<F> { + type Response = TableRpc<F>; +} impl<F, R> Table<F, R> where @@ -59,32 +62,27 @@ where system: Arc<System>, db: &sled::Db, name: String, - rpc_server: &mut RpcServer, ) -> Arc<Self> { - let rpc_path = format!("table_{}", name); - let rpc_client = system.rpc_client::<TableRpc<F>>(&rpc_path); + let endpoint = system + .netapp + .endpoint(format!("garage_table/table.rs/Rpc:{}", name)); let data = TableData::new(system.clone(), name, instance, replication, db); let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); - let syncer = TableSyncer::launch( - system.clone(), - data.clone(), - merkle_updater.clone(), - rpc_server, - ); - TableGc::launch(system.clone(), data.clone(), rpc_server); + let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone()); + TableGc::launch(system.clone(), data.clone()); let table = Arc::new(Self { system, data, merkle_updater, syncer, - rpc_client, + endpoint, }); - table.clone().register_handler(rpc_server, rpc_path); + table.endpoint.set_handler(table.clone()); table } @@ -97,11 +95,14 @@ where let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRpc::<F>::Update(vec![e_enc]); - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, &who[..], rpc, - RequestStrategy::with_quorum(self.data.replication.write_quorum()) + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.data.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -123,7 +124,16 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRpc::<F>::Update(entries); - let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self + .system + .rpc + .call( + &self.endpoint, + node, + rpc, + RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT), + ) + .await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); @@ -152,11 +162,14 @@ where let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self - .rpc_client + .system + .rpc .try_call_many( + &self.endpoint, &who[..], rpc, - RequestStrategy::with_quorum(self.data.replication.read_quorum()) + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -208,11 +221,14 @@ where let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .rpc_client + .system + .rpc .try_call_many( + &self.endpoint, &who[..], rpc, - RequestStrategy::with_quorum(self.data.replication.read_quorum()) + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -261,36 +277,25 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== - async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { + async fn repair_on_read(&self, who: &[NodeID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, who, TableRpc::<F>::Update(vec![what_enc]), - RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(who.len()) + .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; Ok(()) } - // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== - - fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::<TableRpc<F>, _, _>(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); - - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); - } - - async fn handle(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> { + // ====== RPC HANDLER ===== + // + async fn handle_rpc(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> { match msg { TableRpc::ReadEntry(key, sort_key) => { let value = self.data.read_entry(key, sort_key)?; @@ -308,3 +313,16 @@ where } } } + +#[async_trait] +impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc<Self>, msg: &TableRpc<F>, _from: NodeID) -> TableRpc<F> { + self.handle_rpc(msg) + .await + .unwrap_or_else(|e| TableRpc::<F>::Error(format!("{}", e))) + } +} |