aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs94
1 files changed, 61 insertions, 33 deletions
diff --git a/src/table.rs b/src/table.rs
index def0d8b8..9ba9d94a 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -1,15 +1,14 @@
-use std::time::Duration;
-use std::sync::Arc;
-use serde::{Serialize, Deserialize};
use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use std::sync::Arc;
+use std::time::Duration;
-use crate::error::Error;
-use crate::proto::*;
use crate::data::*;
+use crate::error::Error;
use crate::membership::System;
+use crate::proto::*;
use crate::rpc_client::*;
-
pub struct Table<F: TableFormat> {
pub instance: F,
@@ -72,7 +71,9 @@ pub trait SortKey {
fn sort_key(&self) -> &[u8];
}
-pub trait Entry<P: PartitionKey, S: SortKey>: PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync {
+pub trait Entry<P: PartitionKey, S: SortKey>:
+ PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
+{
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
@@ -114,10 +115,15 @@ pub trait TableFormat: Send + Sync {
}
impl<F: TableFormat + 'static> Table<F> {
- pub fn new(instance: F, system: Arc<System>, db: &sled::Db, name: String, param: TableReplicationParams) -> Self {
- let store = db.open_tree(&name)
- .expect("Unable to open DB tree");
- Self{
+ pub fn new(
+ instance: F,
+ system: Arc<System>,
+ db: &sled::Db,
+ name: String,
+ param: TableReplicationParams,
+ ) -> Self {
+ let store = db.open_tree(&name).expect("Unable to open DB tree");
+ Self {
instance,
name,
system,
@@ -128,33 +134,39 @@ impl<F: TableFormat + 'static> Table<F> {
}
pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
- Box::new(TableRpcHandlerAdapter::<F>{ table: self })
+ Box::new(TableRpcHandlerAdapter::<F> { table: self })
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.system.members.read().await
+ let who = self
+ .system
+ .members
+ .read()
+ .await
.walk_ring(&hash, self.param.replication_factor);
eprintln!("insert who: {:?}", who);
let rpc = &TableRPC::<F>::Update(vec![e.clone()]);
-
- self.rpc_try_call_many(&who[..],
- &rpc,
- self.param.write_quorum).await?;
+
+ self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum)
+ .await?;
Ok(())
}
pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.system.members.read().await
+ let who = self
+ .system
+ .members
+ .read()
+ .await
.walk_ring(&hash, self.param.replication_factor);
eprintln!("get who: {:?}", who);
let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
- let resps = self.rpc_try_call_many(&who[..],
- &rpc,
- self.param.read_quorum)
+ let resps = self
+ .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum)
.await?;
let mut ret = None;
@@ -180,27 +192,37 @@ impl<F: TableFormat + 'static> Table<F> {
if let Some(ret_entry) = &ret {
if not_all_same {
// Repair on read
- let _: Result<_, _> = self.rpc_try_call_many(
+ let _: Result<_, _> = self
+ .rpc_try_call_many(
&who[..],
&TableRPC::<F>::Update(vec![ret_entry.clone()]),
- who.len())
+ who.len(),
+ )
.await;
}
}
Ok(ret)
}
- async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> {
+ async fn rpc_try_call_many(
+ &self,
+ who: &[UUID],
+ rpc: &TableRPC<F>,
+ quorum: usize,
+ ) -> Result<Vec<TableRPC<F>>, Error> {
eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?);
let rpc_bytes = rmp_to_vec_all_named(rpc)?;
let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes);
- let resps = rpc_try_call_many(self.system.clone(),
- who,
- &rpc_msg,
- quorum,
- self.param.timeout).await?;
+ let resps = rpc_try_call_many(
+ self.system.clone(),
+ who,
+ &rpc_msg,
+ quorum,
+ self.param.timeout,
+ )
+ .await?;
let mut resps_vals = vec![];
for resp in resps {
@@ -210,9 +232,15 @@ impl<F: TableFormat + 'static> Table<F> {
continue;
}
}
- return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp)))
+ return Err(Error::Message(format!(
+ "Invalid reply to TableRPC: {:?}",
+ resp
+ )));
}
- eprintln!("Table RPC responses: {}", serde_json::to_string(&resps_vals)?);
+ eprintln!(
+ "Table RPC responses: {}",
+ serde_json::to_string(&resps_vals)?
+ );
Ok(resps_vals)
}
@@ -226,7 +254,7 @@ impl<F: TableFormat + 'static> Table<F> {
self.handle_update(pairs).await?;
Ok(TableRPC::Ok)
}
- _ => Err(Error::RPCError(format!("Unexpected table RPC")))
+ _ => Err(Error::RPCError(format!("Unexpected table RPC"))),
}
}
@@ -254,7 +282,7 @@ impl<F: TableFormat + 'static> Table<F> {
new_entry.merge(&update);
(Some(old_entry), new_entry)
}
- None => (None, update.clone())
+ None => (None, update.clone()),
};
let new_bytes = rmp_to_vec_all_named(&new_entry)