diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 122 |
1 files changed, 35 insertions, 87 deletions
diff --git a/src/table.rs b/src/table.rs index 3ad08cff..f7354376 100644 --- a/src/table.rs +++ b/src/table.rs @@ -11,14 +11,15 @@ use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; use crate::membership::System; -use crate::proto::*; use crate::rpc_client::*; +use crate::rpc_server::*; use crate::table_sync::*; pub struct Table<F: TableSchema> { pub instance: F, pub name: String, + pub rpc_client: Arc<RpcClient<TableRPC<F>>>, pub system: Arc<System>, pub store: sled::Tree, @@ -35,24 +36,6 @@ pub struct TableReplicationParams { pub timeout: Duration, } -#[async_trait] -pub trait TableRpcHandler { - async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error>; -} - -struct TableRpcHandlerAdapter<F: TableSchema> { - table: Arc<Table<F>>, -} - -#[async_trait] -impl<F: TableSchema + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> { - async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> { - let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?; - let rep = self.table.handle(msg).await?; - Ok(rmp_to_vec_all_named(&rep)?) - } -} - #[derive(Serialize, Deserialize)] pub enum TableRPC<F: TableSchema> { Ok, @@ -67,6 +50,8 @@ pub enum TableRPC<F: TableSchema> { SyncRPC(SyncRPC), } +impl<F: TableSchema> RpcMessage for TableRPC<F> {} + pub trait PartitionKey { fn hash(&self) -> Hash; } @@ -136,18 +121,27 @@ impl<F: TableSchema + 'static> Table<F> { db: &sled::Db, name: String, param: TableReplicationParams, + rpc_server: &mut RpcServer, ) -> Arc<Self> { let store = db.open_tree(&name).expect("Unable to open DB tree"); + + let rpc_path = format!("table_{}", name); + let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); + let table = Arc::new(Self { instance, name, + rpc_client, system, store, param, syncer: ArcSwapOption::from(None), }); + table.clone().register_handler(rpc_server, rpc_path); + let syncer = TableSyncer::launch(table.clone()).await; table.syncer.swap(Some(syncer)); + table } @@ -158,9 +152,10 @@ impl<F: TableSchema + 'static> Table<F> { //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); - let rpc = &TableRPC::<F>::Update(vec![e_enc]); + let rpc = TableRPC::<F>::Update(vec![e_enc]); - self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum) + self.rpc_client + .try_call_many(&who[..], rpc, self.param.write_quorum, self.param.timeout) .await?; Ok(()) } @@ -183,10 +178,8 @@ impl<F: TableSchema + 'static> Table<F> { let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::<F>::Update(entries); - let rpc_bytes = rmp_to_vec_all_named(&rpc)?; - let message = Message::TableRPC(self.name.to_string(), rpc_bytes); - let resp = rpc_call(self.system.clone(), &node, &message, self.param.timeout).await?; + let resp = self.rpc_client.call(&node, rpc, self.param.timeout).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); @@ -214,9 +207,10 @@ impl<F: TableSchema + 'static> Table<F> { let who = ring.walk_ring(&hash, self.param.replication_factor); //eprintln!("get who: {:?}", who); - let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); + let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self - .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) + .rpc_client + .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout) .await?; let mut ret = None; @@ -264,9 +258,10 @@ impl<F: TableSchema + 'static> Table<F> { let who = ring.walk_ring(&hash, self.param.replication_factor); let rpc = - &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); + TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); let resps = self - .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) + .rpc_client + .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout) .await?; let mut ret = BTreeMap::new(); @@ -315,71 +310,24 @@ impl<F: TableSchema + 'static> Table<F> { async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len()) + self.rpc_client + .try_call_many( + &who[..], + TableRPC::<F>::Update(vec![what_enc]), + who.len(), + self.param.timeout, + ) .await?; Ok(()) } - async fn rpc_try_call_many( - &self, - who: &[UUID], - rpc: &TableRPC<F>, - quorum: usize, - ) -> Result<Vec<TableRPC<F>>, Error> { - //eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); - - let rpc_bytes = rmp_to_vec_all_named(rpc)?; - let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); - - let resps = rpc_try_call_many( - self.system.clone(), - who, - rpc_msg, - quorum, - self.param.timeout, - ) - .await?; - - let mut resps_vals = vec![]; - for resp in resps { - if let Message::TableRPC(tbl, rep_by) = &resp { - if *tbl == self.name { - resps_vals.push(rmp_serde::decode::from_read_ref(&rep_by)?); - continue; - } - } - return Err(Error::Message(format!( - "Invalid reply to TableRPC: {:?}", - resp - ))); - } - //eprintln!( - // "Table RPC responses: {}", - // serde_json::to_string(&resps_vals)? - //); - Ok(resps_vals) - } - - pub async fn rpc_call(&self, who: &UUID, rpc: &TableRPC<F>) -> Result<TableRPC<F>, Error> { - let rpc_bytes = rmp_to_vec_all_named(rpc)?; - let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); - - let resp = rpc_call(self.system.clone(), who, &rpc_msg, self.param.timeout).await?; - if let Message::TableRPC(tbl, rep_by) = &resp { - if *tbl == self.name { - return Ok(rmp_serde::decode::from_read_ref(&rep_by)?); - } - } - Err(Error::Message(format!( - "Invalid reply to TableRPC: {:?}", - resp - ))) - } - // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== - pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> { - Box::new(TableRpcHandlerAdapter::<F> { table: self }) + fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { + rpc_server.add_handler::<TableRPC<F>, _, _>(path, move |msg, _addr| { + let self2 = self.clone(); + async move { self2.handle(msg).await } + }) } async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { |