diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 87 |
1 files changed, 57 insertions, 30 deletions
diff --git a/src/table.rs b/src/table.rs index f7354376..d5357277 100644 --- a/src/table.rs +++ b/src/table.rs @@ -10,30 +10,23 @@ use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; -use crate::membership::System; +use crate::membership::{Ring, System}; use crate::rpc_client::*; use crate::rpc_server::*; use crate::table_sync::*; -pub struct Table<F: TableSchema> { +const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); + +pub struct Table<F: TableSchema, R: TableReplication> { pub instance: F, + pub replication: R, pub name: String, pub rpc_client: Arc<RpcClient<TableRPC<F>>>, pub system: Arc<System>, pub store: sled::Tree, - pub syncer: ArcSwapOption<TableSyncer<F>>, - - pub param: TableReplicationParams, -} - -#[derive(Clone)] -pub struct TableReplicationParams { - pub replication_factor: usize, - pub read_quorum: usize, - pub write_quorum: usize, - pub timeout: Duration, + pub syncer: ArcSwapOption<TableSyncer<F, R>>, } #[derive(Serialize, Deserialize)] @@ -112,15 +105,38 @@ pub trait TableSchema: Send + Sync { } } -impl<F: TableSchema + 'static> Table<F> { +pub trait TableReplication: Send + Sync { + // See examples in table_sharded.rs and table_fullcopy.rs + // To understand various replication methods + + // Which nodes to send reads from + fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; + fn read_quorum(&self) -> usize; + + // Which nodes to send writes to + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; + fn write_quorum(&self) -> usize; + fn max_write_errors(&self) -> usize; + fn epidemic_writes(&self) -> bool; + + // Which are the nodes that do actually replicate the data + fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>; + fn split_points(&self, ring: &Ring) -> Vec<Hash>; +} + +impl<F, R> Table<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== pub async fn new( instance: F, + replication: R, system: Arc<System>, db: &sled::Db, name: String, - param: TableReplicationParams, rpc_server: &mut RpcServer, ) -> Arc<Self> { let store = db.open_tree(&name).expect("Unable to open DB tree"); @@ -130,11 +146,11 @@ impl<F: TableSchema + 'static> Table<F> { let table = Arc::new(Self { instance, + replication, name, rpc_client, system, store, - param, syncer: ArcSwapOption::from(None), }); table.clone().register_handler(rpc_server, rpc_path); @@ -147,15 +163,19 @@ impl<F: TableSchema + 'static> Table<F> { pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let ring = self.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, self.param.replication_factor); + let who = self.replication.write_nodes(&hash, &self.system); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::<F>::Update(vec![e_enc]); self.rpc_client - .try_call_many(&who[..], rpc, self.param.write_quorum, self.param.timeout) + .try_call_many( + &who[..], + rpc, + self.replication.write_quorum(), + TABLE_RPC_TIMEOUT, + ) .await?; Ok(()) } @@ -165,8 +185,7 @@ impl<F: TableSchema + 'static> Table<F> { for entry in entries.iter() { let hash = entry.partition_key().hash(); - let ring = self.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, self.param.replication_factor); + let who = self.replication.write_nodes(&hash, &self.system); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -179,7 +198,7 @@ impl<F: TableSchema + 'static> Table<F> { let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::<F>::Update(entries); - let resp = self.rpc_client.call(&node, rpc, self.param.timeout).await?; + let resp = self.rpc_client.call(&node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); @@ -190,7 +209,7 @@ impl<F: TableSchema + 'static> Table<F> { errors.push(e); } } - if errors.len() > self.param.replication_factor - self.param.write_quorum { + if errors.len() > self.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -203,14 +222,18 @@ impl<F: TableSchema + 'static> Table<F> { sort_key: &F::S, ) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); - let ring = self.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, self.param.replication_factor); + let who = self.replication.read_nodes(&hash, &self.system); //eprintln!("get who: {:?}", who); let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self .rpc_client - .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout) + .try_call_many( + &who[..], + rpc, + self.replication.read_quorum(), + TABLE_RPC_TIMEOUT, + ) .await?; let mut ret = None; @@ -254,14 +277,18 @@ impl<F: TableSchema + 'static> Table<F> { limit: usize, ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); - let ring = self.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, self.param.replication_factor); + let who = self.replication.read_nodes(&hash, &self.system); let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); let resps = self .rpc_client - .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout) + .try_call_many( + &who[..], + rpc, + self.replication.read_quorum(), + TABLE_RPC_TIMEOUT, + ) .await?; let mut ret = BTreeMap::new(); @@ -315,7 +342,7 @@ impl<F: TableSchema + 'static> Table<F> { &who[..], TableRPC::<F>::Update(vec![what_enc]), who.len(), - self.param.timeout, + TABLE_RPC_TIMEOUT, ) .await?; Ok(()) |