aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs122
1 files changed, 35 insertions, 87 deletions
diff --git a/src/table.rs b/src/table.rs
index 3ad08cff..f7354376 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -11,14 +11,15 @@ use serde_bytes::ByteBuf;
use crate::data::*;
use crate::error::Error;
use crate::membership::System;
-use crate::proto::*;
use crate::rpc_client::*;
+use crate::rpc_server::*;
use crate::table_sync::*;
pub struct Table<F: TableSchema> {
pub instance: F,
pub name: String,
+ pub rpc_client: Arc<RpcClient<TableRPC<F>>>,
pub system: Arc<System>,
pub store: sled::Tree,
@@ -35,24 +36,6 @@ pub struct TableReplicationParams {
pub timeout: Duration,
}
-#[async_trait]
-pub trait TableRpcHandler {
- async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error>;
-}
-
-struct TableRpcHandlerAdapter<F: TableSchema> {
- table: Arc<Table<F>>,
-}
-
-#[async_trait]
-impl<F: TableSchema + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> {
- async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> {
- let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?;
- let rep = self.table.handle(msg).await?;
- Ok(rmp_to_vec_all_named(&rep)?)
- }
-}
-
#[derive(Serialize, Deserialize)]
pub enum TableRPC<F: TableSchema> {
Ok,
@@ -67,6 +50,8 @@ pub enum TableRPC<F: TableSchema> {
SyncRPC(SyncRPC),
}
+impl<F: TableSchema> RpcMessage for TableRPC<F> {}
+
pub trait PartitionKey {
fn hash(&self) -> Hash;
}
@@ -136,18 +121,27 @@ impl<F: TableSchema + 'static> Table<F> {
db: &sled::Db,
name: String,
param: TableReplicationParams,
+ rpc_server: &mut RpcServer,
) -> Arc<Self> {
let store = db.open_tree(&name).expect("Unable to open DB tree");
+
+ let rpc_path = format!("table_{}", name);
+ let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
+
let table = Arc::new(Self {
instance,
name,
+ rpc_client,
system,
store,
param,
syncer: ArcSwapOption::from(None),
});
+ table.clone().register_handler(rpc_server, rpc_path);
+
let syncer = TableSyncer::launch(table.clone()).await;
table.syncer.swap(Some(syncer));
+
table
}
@@ -158,9 +152,10 @@ impl<F: TableSchema + 'static> Table<F> {
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
- let rpc = &TableRPC::<F>::Update(vec![e_enc]);
+ let rpc = TableRPC::<F>::Update(vec![e_enc]);
- self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum)
+ self.rpc_client
+ .try_call_many(&who[..], rpc, self.param.write_quorum, self.param.timeout)
.await?;
Ok(())
}
@@ -183,10 +178,8 @@ impl<F: TableSchema + 'static> Table<F> {
let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRPC::<F>::Update(entries);
- let rpc_bytes = rmp_to_vec_all_named(&rpc)?;
- let message = Message::TableRPC(self.name.to_string(), rpc_bytes);
- let resp = rpc_call(self.system.clone(), &node, &message, self.param.timeout).await?;
+ let resp = self.rpc_client.call(&node, rpc, self.param.timeout).await?;
Ok::<_, Error>((node, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@@ -214,9 +207,10 @@ impl<F: TableSchema + 'static> Table<F> {
let who = ring.walk_ring(&hash, self.param.replication_factor);
//eprintln!("get who: {:?}", who);
- let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
+ let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
- .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum)
+ .rpc_client
+ .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout)
.await?;
let mut ret = None;
@@ -264,9 +258,10 @@ impl<F: TableSchema + 'static> Table<F> {
let who = ring.walk_ring(&hash, self.param.replication_factor);
let rpc =
- &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
+ TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
let resps = self
- .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum)
+ .rpc_client
+ .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout)
.await?;
let mut ret = BTreeMap::new();
@@ -315,71 +310,24 @@ impl<F: TableSchema + 'static> Table<F> {
async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
- self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len())
+ self.rpc_client
+ .try_call_many(
+ &who[..],
+ TableRPC::<F>::Update(vec![what_enc]),
+ who.len(),
+ self.param.timeout,
+ )
.await?;
Ok(())
}
- async fn rpc_try_call_many(
- &self,
- who: &[UUID],
- rpc: &TableRPC<F>,
- quorum: usize,
- ) -> Result<Vec<TableRPC<F>>, Error> {
- //eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?);
-
- let rpc_bytes = rmp_to_vec_all_named(rpc)?;
- let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes);
-
- let resps = rpc_try_call_many(
- self.system.clone(),
- who,
- rpc_msg,
- quorum,
- self.param.timeout,
- )
- .await?;
-
- let mut resps_vals = vec![];
- for resp in resps {
- if let Message::TableRPC(tbl, rep_by) = &resp {
- if *tbl == self.name {
- resps_vals.push(rmp_serde::decode::from_read_ref(&rep_by)?);
- continue;
- }
- }
- return Err(Error::Message(format!(
- "Invalid reply to TableRPC: {:?}",
- resp
- )));
- }
- //eprintln!(
- // "Table RPC responses: {}",
- // serde_json::to_string(&resps_vals)?
- //);
- Ok(resps_vals)
- }
-
- pub async fn rpc_call(&self, who: &UUID, rpc: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
- let rpc_bytes = rmp_to_vec_all_named(rpc)?;
- let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes);
-
- let resp = rpc_call(self.system.clone(), who, &rpc_msg, self.param.timeout).await?;
- if let Message::TableRPC(tbl, rep_by) = &resp {
- if *tbl == self.name {
- return Ok(rmp_serde::decode::from_read_ref(&rep_by)?);
- }
- }
- Err(Error::Message(format!(
- "Invalid reply to TableRPC: {:?}",
- resp
- )))
- }
-
// =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ==============
- pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
- Box::new(TableRpcHandlerAdapter::<F> { table: self })
+ fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ rpc_server.add_handler::<TableRPC<F>, _, _>(path, move |msg, _addr| {
+ let self2 = self.clone();
+ async move { self2.handle(msg).await }
+ })
}
async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {