diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 94 |
1 files changed, 61 insertions, 33 deletions
diff --git a/src/table.rs b/src/table.rs index def0d8b8..9ba9d94a 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,15 +1,14 @@ -use std::time::Duration; -use std::sync::Arc; -use serde::{Serialize, Deserialize}; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::time::Duration; -use crate::error::Error; -use crate::proto::*; use crate::data::*; +use crate::error::Error; use crate::membership::System; +use crate::proto::*; use crate::rpc_client::*; - pub struct Table<F: TableFormat> { pub instance: F, @@ -72,7 +71,9 @@ pub trait SortKey { fn sort_key(&self) -> &[u8]; } -pub trait Entry<P: PartitionKey, S: SortKey>: PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { +pub trait Entry<P: PartitionKey, S: SortKey>: + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync +{ fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; @@ -114,10 +115,15 @@ pub trait TableFormat: Send + Sync { } impl<F: TableFormat + 'static> Table<F> { - pub fn new(instance: F, system: Arc<System>, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { - let store = db.open_tree(&name) - .expect("Unable to open DB tree"); - Self{ + pub fn new( + instance: F, + system: Arc<System>, + db: &sled::Db, + name: String, + param: TableReplicationParams, + ) -> Self { + let store = db.open_tree(&name).expect("Unable to open DB tree"); + Self { instance, name, system, @@ -128,33 +134,39 @@ impl<F: TableFormat + 'static> Table<F> { } pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> { - Box::new(TableRpcHandlerAdapter::<F>{ table: self }) + Box::new(TableRpcHandlerAdapter::<F> { table: self }) } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.system.members.read().await + let who = self + .system + .members + .read() + .await .walk_ring(&hash, self.param.replication_factor); eprintln!("insert who: {:?}", who); let rpc = &TableRPC::<F>::Update(vec![e.clone()]); - - self.rpc_try_call_many(&who[..], - &rpc, - self.param.write_quorum).await?; + + self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum) + .await?; Ok(()) } pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); - let who = self.system.members.read().await + let who = self + .system + .members + .read() + .await .walk_ring(&hash, self.param.replication_factor); eprintln!("get who: {:?}", who); let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); - let resps = self.rpc_try_call_many(&who[..], - &rpc, - self.param.read_quorum) + let resps = self + .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) .await?; let mut ret = None; @@ -180,27 +192,37 @@ impl<F: TableFormat + 'static> Table<F> { if let Some(ret_entry) = &ret { if not_all_same { // Repair on read - let _: Result<_, _> = self.rpc_try_call_many( + let _: Result<_, _> = self + .rpc_try_call_many( &who[..], &TableRPC::<F>::Update(vec![ret_entry.clone()]), - who.len()) + who.len(), + ) .await; } } Ok(ret) } - async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> { + async fn rpc_try_call_many( + &self, + who: &[UUID], + rpc: &TableRPC<F>, + quorum: usize, + ) -> Result<Vec<TableRPC<F>>, Error> { eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); let rpc_bytes = rmp_to_vec_all_named(rpc)?; let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); - let resps = rpc_try_call_many(self.system.clone(), - who, - &rpc_msg, - quorum, - self.param.timeout).await?; + let resps = rpc_try_call_many( + self.system.clone(), + who, + &rpc_msg, + quorum, + self.param.timeout, + ) + .await?; let mut resps_vals = vec![]; for resp in resps { @@ -210,9 +232,15 @@ impl<F: TableFormat + 'static> Table<F> { continue; } } - return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) + return Err(Error::Message(format!( + "Invalid reply to TableRPC: {:?}", + resp + ))); } - eprintln!("Table RPC responses: {}", serde_json::to_string(&resps_vals)?); + eprintln!( + "Table RPC responses: {}", + serde_json::to_string(&resps_vals)? + ); Ok(resps_vals) } @@ -226,7 +254,7 @@ impl<F: TableFormat + 'static> Table<F> { self.handle_update(pairs).await?; Ok(TableRPC::Ok) } - _ => Err(Error::RPCError(format!("Unexpected table RPC"))) + _ => Err(Error::RPCError(format!("Unexpected table RPC"))), } } @@ -254,7 +282,7 @@ impl<F: TableFormat + 'static> Table<F> { new_entry.merge(&update); (Some(old_entry), new_entry) } - None => (None, update.clone()) + None => (None, update.clone()), }; let new_bytes = rmp_to_vec_all_named(&new_entry) |