From f41583e1b731574b4bb13a20d4b3fd9fe3a899f5 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 18 Apr 2020 19:21:34 +0200 Subject: Massive RPC refactoring --- src/table.rs | 122 +++++++++++++++++------------------------------------------ 1 file changed, 35 insertions(+), 87 deletions(-) (limited to 'src/table.rs') diff --git a/src/table.rs b/src/table.rs index 3ad08cff..f7354376 100644 --- a/src/table.rs +++ b/src/table.rs @@ -11,14 +11,15 @@ use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; use crate::membership::System; -use crate::proto::*; use crate::rpc_client::*; +use crate::rpc_server::*; use crate::table_sync::*; pub struct Table { pub instance: F, pub name: String, + pub rpc_client: Arc>>, pub system: Arc, pub store: sled::Tree, @@ -35,24 +36,6 @@ pub struct TableReplicationParams { pub timeout: Duration, } -#[async_trait] -pub trait TableRpcHandler { - async fn handle(&self, rpc: &[u8]) -> Result, Error>; -} - -struct TableRpcHandlerAdapter { - table: Arc>, -} - -#[async_trait] -impl TableRpcHandler for TableRpcHandlerAdapter { - async fn handle(&self, rpc: &[u8]) -> Result, Error> { - let msg = rmp_serde::decode::from_read_ref::<_, TableRPC>(rpc)?; - let rep = self.table.handle(msg).await?; - Ok(rmp_to_vec_all_named(&rep)?) - } -} - #[derive(Serialize, Deserialize)] pub enum TableRPC { Ok, @@ -67,6 +50,8 @@ pub enum TableRPC { SyncRPC(SyncRPC), } +impl RpcMessage for TableRPC {} + pub trait PartitionKey { fn hash(&self) -> Hash; } @@ -136,18 +121,27 @@ impl Table { db: &sled::Db, name: String, param: TableReplicationParams, + rpc_server: &mut RpcServer, ) -> Arc { let store = db.open_tree(&name).expect("Unable to open DB tree"); + + let rpc_path = format!("table_{}", name); + let rpc_client = system.rpc_client::>(&rpc_path); + let table = Arc::new(Self { instance, name, + rpc_client, system, store, param, syncer: ArcSwapOption::from(None), }); + table.clone().register_handler(rpc_server, rpc_path); + let syncer = TableSyncer::launch(table.clone()).await; table.syncer.swap(Some(syncer)); + table } @@ -158,9 +152,10 @@ impl Table { //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); - let rpc = &TableRPC::::Update(vec![e_enc]); + let rpc = TableRPC::::Update(vec![e_enc]); - self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum) + self.rpc_client + .try_call_many(&who[..], rpc, self.param.write_quorum, self.param.timeout) .await?; Ok(()) } @@ -183,10 +178,8 @@ impl Table { let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::::Update(entries); - let rpc_bytes = rmp_to_vec_all_named(&rpc)?; - let message = Message::TableRPC(self.name.to_string(), rpc_bytes); - let resp = rpc_call(self.system.clone(), &node, &message, self.param.timeout).await?; + let resp = self.rpc_client.call(&node, rpc, self.param.timeout).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::>(); @@ -214,9 +207,10 @@ impl Table { let who = ring.walk_ring(&hash, self.param.replication_factor); //eprintln!("get who: {:?}", who); - let rpc = &TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); + let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self - .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) + .rpc_client + .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout) .await?; let mut ret = None; @@ -264,9 +258,10 @@ impl Table { let who = ring.walk_ring(&hash, self.param.replication_factor); let rpc = - &TableRPC::::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); + TableRPC::::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); let resps = self - .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) + .rpc_client + .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout) .await?; let mut ret = BTreeMap::new(); @@ -315,71 +310,24 @@ impl Table { async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.rpc_try_call_many(&who[..], &TableRPC::::Update(vec![what_enc]), who.len()) + self.rpc_client + .try_call_many( + &who[..], + TableRPC::::Update(vec![what_enc]), + who.len(), + self.param.timeout, + ) .await?; Ok(()) } - async fn rpc_try_call_many( - &self, - who: &[UUID], - rpc: &TableRPC, - quorum: usize, - ) -> Result>, Error> { - //eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); - - let rpc_bytes = rmp_to_vec_all_named(rpc)?; - let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); - - let resps = rpc_try_call_many( - self.system.clone(), - who, - rpc_msg, - quorum, - self.param.timeout, - ) - .await?; - - let mut resps_vals = vec![]; - for resp in resps { - if let Message::TableRPC(tbl, rep_by) = &resp { - if *tbl == self.name { - resps_vals.push(rmp_serde::decode::from_read_ref(&rep_by)?); - continue; - } - } - return Err(Error::Message(format!( - "Invalid reply to TableRPC: {:?}", - resp - ))); - } - //eprintln!( - // "Table RPC responses: {}", - // serde_json::to_string(&resps_vals)? - //); - Ok(resps_vals) - } - - pub async fn rpc_call(&self, who: &UUID, rpc: &TableRPC) -> Result, Error> { - let rpc_bytes = rmp_to_vec_all_named(rpc)?; - let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); - - let resp = rpc_call(self.system.clone(), who, &rpc_msg, self.param.timeout).await?; - if let Message::TableRPC(tbl, rep_by) = &resp { - if *tbl == self.name { - return Ok(rmp_serde::decode::from_read_ref(&rep_by)?); - } - } - Err(Error::Message(format!( - "Invalid reply to TableRPC: {:?}", - resp - ))) - } - // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== - pub fn rpc_handler(self: Arc) -> Box { - Box::new(TableRpcHandlerAdapter:: { table: self }) + fn register_handler(self: Arc, rpc_server: &mut RpcServer, path: String) { + rpc_server.add_handler::, _, _>(path, move |msg, _addr| { + let self2 = self.clone(); + async move { self2.handle(msg).await } + }) } async fn handle(self: &Arc, msg: TableRPC) -> Result, Error> { -- cgit v1.2.3