aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs46
1 files changed, 23 insertions, 23 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 33b01455..a3afbbba 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -34,11 +34,11 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
merkle: Arc<MerkleUpdater<F, R>>,
todo: Mutex<SyncTodo>,
- rpc_client: Arc<RpcClient<SyncRPC>>,
+ rpc_client: Arc<RpcClient<SyncRpc>>,
}
#[derive(Serialize, Deserialize)]
-pub(crate) enum SyncRPC {
+pub(crate) enum SyncRpc {
RootCkHash(Partition, Hash),
RootCkDifferent(bool),
GetNode(MerkleNodeKey),
@@ -47,7 +47,7 @@ pub(crate) enum SyncRPC {
Ok,
}
-impl RpcMessage for SyncRPC {}
+impl RpcMessage for SyncRpc {}
struct SyncTodo {
todo: Vec<TodoPartition>,
@@ -75,7 +75,7 @@ where
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}/sync", data.name);
- let rpc_client = system.rpc_client::<SyncRPC>(&rpc_path);
+ let rpc_client = system.rpc_client::<SyncRpc>(&rpc_path);
let todo = SyncTodo { todo: vec![] };
@@ -114,7 +114,7 @@ where
fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
let self2 = self.clone();
- rpc_server.add_handler::<SyncRPC, _, _>(path, move |msg, _addr| {
+ rpc_server.add_handler::<SyncRpc, _, _>(path, move |msg, _addr| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
@@ -317,14 +317,14 @@ where
async fn offload_items(
self: &Arc<Self>,
items: &[(Vec<u8>, Arc<ByteBuf>)],
- nodes: &[UUID],
+ nodes: &[Uuid],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
self.rpc_client
.try_call_many(
nodes,
- SyncRPC::Items(values),
+ SyncRpc::Items(values),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
.await?;
@@ -362,7 +362,7 @@ where
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
- who: UUID,
+ who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
@@ -381,20 +381,20 @@ where
.rpc_client
.call(
who,
- SyncRPC::RootCkHash(partition.partition, root_ck_hash),
+ SyncRpc::RootCkHash(partition.partition, root_ck_hash),
TABLE_SYNC_RPC_TIMEOUT,
)
.await?;
let mut todo = match root_resp {
- SyncRPC::RootCkDifferent(false) => {
+ SyncRpc::RootCkDifferent(false) => {
debug!(
"({}) Sync {:?} with {:?}: no difference",
self.data.name, partition, who
);
return Ok(());
}
- SyncRPC::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]),
+ SyncRpc::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]),
x => {
return Err(Error::Message(format!(
"Invalid respone to RootCkHash RPC: {}",
@@ -431,10 +431,10 @@ where
// and compare it with local node
let remote_node = match self
.rpc_client
- .call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
+ .call(who, SyncRpc::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
.await?
{
- SyncRPC::Node(_, node) => node,
+ SyncRpc::Node(_, node) => node,
x => {
return Err(Error::Message(format!(
"Invalid respone to GetNode RPC: {}",
@@ -478,7 +478,7 @@ where
Ok(())
}
- async fn send_items(&self, who: UUID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.data.name,
@@ -493,9 +493,9 @@ where
let rpc_resp = self
.rpc_client
- .call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT)
+ .call(who, SyncRpc::Items(values), TABLE_SYNC_RPC_TIMEOUT)
.await?;
- if let SyncRPC::Ok = rpc_resp {
+ if let SyncRpc::Ok = rpc_resp {
Ok(())
} else {
Err(Error::Message(format!(
@@ -507,20 +507,20 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
- async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
+ async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> {
match message {
- SyncRPC::RootCkHash(range, h) => {
+ SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
let hash = hash_of::<MerkleNode>(&root_ck)?;
- Ok(SyncRPC::RootCkDifferent(hash != *h))
+ Ok(SyncRpc::RootCkDifferent(hash != *h))
}
- SyncRPC::GetNode(k) => {
+ SyncRpc::GetNode(k) => {
let node = self.merkle.read_node(&k)?;
- Ok(SyncRPC::Node(k.clone(), node))
+ Ok(SyncRpc::Node(k.clone(), node))
}
- SyncRPC::Items(items) => {
+ SyncRpc::Items(items) => {
self.data.update_many(items)?;
- Ok(SyncRPC::Ok)
+ Ok(SyncRpc::Ok)
}
_ => Err(Error::Message("Unexpected sync RPC".to_string())),
}