aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs87
1 files changed, 57 insertions, 30 deletions
diff --git a/src/table.rs b/src/table.rs
index f7354376..d5357277 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -10,30 +10,23 @@ use serde_bytes::ByteBuf;
use crate::data::*;
use crate::error::Error;
-use crate::membership::System;
+use crate::membership::{Ring, System};
use crate::rpc_client::*;
use crate::rpc_server::*;
use crate::table_sync::*;
-pub struct Table<F: TableSchema> {
+const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
+
+pub struct Table<F: TableSchema, R: TableReplication> {
pub instance: F,
+ pub replication: R,
pub name: String,
pub rpc_client: Arc<RpcClient<TableRPC<F>>>,
pub system: Arc<System>,
pub store: sled::Tree,
- pub syncer: ArcSwapOption<TableSyncer<F>>,
-
- pub param: TableReplicationParams,
-}
-
-#[derive(Clone)]
-pub struct TableReplicationParams {
- pub replication_factor: usize,
- pub read_quorum: usize,
- pub write_quorum: usize,
- pub timeout: Duration,
+ pub syncer: ArcSwapOption<TableSyncer<F, R>>,
}
#[derive(Serialize, Deserialize)]
@@ -112,15 +105,38 @@ pub trait TableSchema: Send + Sync {
}
}
-impl<F: TableSchema + 'static> Table<F> {
+pub trait TableReplication: Send + Sync {
+ // See examples in table_sharded.rs and table_fullcopy.rs
+ // To understand various replication methods
+
+ // Which nodes to send reads from
+ fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
+ fn read_quorum(&self) -> usize;
+
+ // Which nodes to send writes to
+ fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
+ fn write_quorum(&self) -> usize;
+ fn max_write_errors(&self) -> usize;
+ fn epidemic_writes(&self) -> bool;
+
+ // Which are the nodes that do actually replicate the data
+ fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
+ fn split_points(&self, ring: &Ring) -> Vec<Hash>;
+}
+
+impl<F, R> Table<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub async fn new(
instance: F,
+ replication: R,
system: Arc<System>,
db: &sled::Db,
name: String,
- param: TableReplicationParams,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let store = db.open_tree(&name).expect("Unable to open DB tree");
@@ -130,11 +146,11 @@ impl<F: TableSchema + 'static> Table<F> {
let table = Arc::new(Self {
instance,
+ replication,
name,
rpc_client,
system,
store,
- param,
syncer: ArcSwapOption::from(None),
});
table.clone().register_handler(rpc_server, rpc_path);
@@ -147,15 +163,19 @@ impl<F: TableSchema + 'static> Table<F> {
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let ring = self.system.ring.borrow().clone();
- let who = ring.walk_ring(&hash, self.param.replication_factor);
+ let who = self.replication.write_nodes(&hash, &self.system);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRPC::<F>::Update(vec![e_enc]);
self.rpc_client
- .try_call_many(&who[..], rpc, self.param.write_quorum, self.param.timeout)
+ .try_call_many(
+ &who[..],
+ rpc,
+ self.replication.write_quorum(),
+ TABLE_RPC_TIMEOUT,
+ )
.await?;
Ok(())
}
@@ -165,8 +185,7 @@ impl<F: TableSchema + 'static> Table<F> {
for entry in entries.iter() {
let hash = entry.partition_key().hash();
- let ring = self.system.ring.borrow().clone();
- let who = ring.walk_ring(&hash, self.param.replication_factor);
+ let who = self.replication.write_nodes(&hash, &self.system);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@@ -179,7 +198,7 @@ impl<F: TableSchema + 'static> Table<F> {
let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRPC::<F>::Update(entries);
- let resp = self.rpc_client.call(&node, rpc, self.param.timeout).await?;
+ let resp = self.rpc_client.call(&node, rpc, TABLE_RPC_TIMEOUT).await?;
Ok::<_, Error>((node, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@@ -190,7 +209,7 @@ impl<F: TableSchema + 'static> Table<F> {
errors.push(e);
}
}
- if errors.len() > self.param.replication_factor - self.param.write_quorum {
+ if errors.len() > self.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
Ok(())
@@ -203,14 +222,18 @@ impl<F: TableSchema + 'static> Table<F> {
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let ring = self.system.ring.borrow().clone();
- let who = ring.walk_ring(&hash, self.param.replication_factor);
+ let who = self.replication.read_nodes(&hash, &self.system);
//eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
.rpc_client
- .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout)
+ .try_call_many(
+ &who[..],
+ rpc,
+ self.replication.read_quorum(),
+ TABLE_RPC_TIMEOUT,
+ )
.await?;
let mut ret = None;
@@ -254,14 +277,18 @@ impl<F: TableSchema + 'static> Table<F> {
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
- let ring = self.system.ring.borrow().clone();
- let who = ring.walk_ring(&hash, self.param.replication_factor);
+ let who = self.replication.read_nodes(&hash, &self.system);
let rpc =
TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
let resps = self
.rpc_client
- .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout)
+ .try_call_many(
+ &who[..],
+ rpc,
+ self.replication.read_quorum(),
+ TABLE_RPC_TIMEOUT,
+ )
.await?;
let mut ret = BTreeMap::new();
@@ -315,7 +342,7 @@ impl<F: TableSchema + 'static> Table<F> {
&who[..],
TableRPC::<F>::Update(vec![what_enc]),
who.len(),
- self.param.timeout,
+ TABLE_RPC_TIMEOUT,
)
.await?;
Ok(())