aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs118
1 files changed, 87 insertions, 31 deletions
diff --git a/src/table.rs b/src/table.rs
index 6892c9f5..40114aec 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -1,4 +1,4 @@
-use std::collections::HashMap;
+use std::collections::{HashMap, BTreeMap};
use std::sync::Arc;
use std::time::Duration;
@@ -60,10 +60,11 @@ pub enum TableRPC<F: TableSchema> {
ReadEntry(F::P, F::S),
ReadEntryResponse(Option<ByteBuf>),
+ ReadRange(F::P, F::S, Option<F::Filter>, usize),
+
Update(Vec<Arc<ByteBuf>>),
- SyncChecksums(Vec<RangeChecksum>),
- SyncDifferentSet(Vec<SyncRange>),
+ SyncRPC(SyncRPC),
}
pub trait PartitionKey {
@@ -118,11 +119,15 @@ pub trait TableSchema: Send + Sync {
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type E: Entry<Self::P, Self::S>;
+ type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>);
+ fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true }
}
impl<F: TableSchema + 'static> Table<F> {
+ // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
+
pub async fn new(
instance: F,
system: Arc<System>,
@@ -144,18 +149,10 @@ impl<F: TableSchema + 'static> Table<F> {
table
}
- pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
- Box::new(TableRpcHandlerAdapter::<F> { table: self })
- }
-
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, self.param.replication_factor);
+ let ring = self.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, self.param.replication_factor);
eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
@@ -171,12 +168,8 @@ impl<F: TableSchema + 'static> Table<F> {
for entry in entries.iter() {
let hash = entry.partition_key().hash();
- let who = self
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, self.param.replication_factor);
+ let ring = self.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, self.param.replication_factor);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@@ -215,12 +208,8 @@ impl<F: TableSchema + 'static> Table<F> {
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let who = self
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, self.param.replication_factor);
+ let ring = self.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, self.param.replication_factor);
eprintln!("get who: {:?}", who);
let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
@@ -251,15 +240,76 @@ impl<F: TableSchema + 'static> Table<F> {
}
if let Some(ret_entry) = &ret {
if not_all_same {
+ let self2 = self.clone();
+ let ent2 = ret_entry.clone();
self.system
.background
- .spawn(self.clone().repair_on_read(who, ret_entry.clone()));
+ .spawn(async move {
+ self2.repair_on_read(&who[..], ent2).await
+ });
}
}
Ok(ret)
}
- async fn repair_on_read(self: Arc<Self>, who: Vec<UUID>, what: F::E) -> Result<(), Error> {
+ pub async fn get_range(
+ self: &Arc<Self>,
+ partition_key: &F::P,
+ begin_sort_key: &F::S,
+ filter: Option<F::Filter>,
+ limit: usize,
+ ) -> Result<Vec<F::E>, Error> {
+ let hash = partition_key.hash();
+ let ring = self.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, self.param.replication_factor);
+
+ let rpc = &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
+ let resps = self
+ .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum)
+ .await?;
+
+ let mut ret = BTreeMap::new();
+ let mut to_repair = BTreeMap::new();
+ for resp in resps {
+ if let TableRPC::Update(entries) = resp {
+ for entry_bytes in entries.iter() {
+ let entry = rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?;
+ let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
+ match ret.remove(&entry_key) {
+ None => {
+ ret.insert(entry_key, Some(entry));
+ }
+ Some(Some(mut prev)) => {
+ let must_repair = prev != entry;
+ prev.merge(&entry);
+ if must_repair {
+ to_repair.insert(entry_key.clone(), Some(prev.clone()));
+ }
+ ret.insert(entry_key, Some(prev));
+ }
+ Some(None) => unreachable!(),
+ }
+ }
+ }
+ }
+ if !to_repair.is_empty() {
+ let self2 = self.clone();
+ self.system
+ .background
+ .spawn(async move {
+ for (_, v) in to_repair.iter_mut() {
+ self2.repair_on_read(&who[..], v.take().unwrap()).await?;
+ }
+ Ok(())
+ });
+ }
+ let ret_vec = ret.iter_mut().take(limit).map(|(_k, v)| v.take().unwrap()).collect::<Vec<_>>();
+ Ok(ret_vec)
+ }
+
+ // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
+
+ async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len())
.await?;
@@ -322,6 +372,12 @@ impl<F: TableSchema + 'static> Table<F> {
)))
}
+ // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ==============
+
+ pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
+ Box::new(TableRpcHandlerAdapter::<F> { table: self })
+ }
+
async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
match msg {
TableRPC::ReadEntry(key, sort_key) => {
@@ -332,12 +388,12 @@ impl<F: TableSchema + 'static> Table<F> {
self.handle_update(pairs).await?;
Ok(TableRPC::Ok)
}
- TableRPC::SyncChecksums(checksums) => {
+ TableRPC::SyncRPC(rpc) => {
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
- let differing = syncer
- .handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone())
+ let response = syncer
+ .handle_rpc(&rpc, self.system.background.stop_signal.clone())
.await?;
- Ok(TableRPC::SyncDifferentSet(differing))
+ Ok(TableRPC::SyncRPC(response))
}
_ => Err(Error::RPCError(format!("Unexpected table RPC"))),
}