diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 118 |
1 files changed, 87 insertions, 31 deletions
diff --git a/src/table.rs b/src/table.rs index 6892c9f5..40114aec 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, BTreeMap}; use std::sync::Arc; use std::time::Duration; @@ -60,10 +60,11 @@ pub enum TableRPC<F: TableSchema> { ReadEntry(F::P, F::S), ReadEntryResponse(Option<ByteBuf>), + ReadRange(F::P, F::S, Option<F::Filter>, usize), + Update(Vec<Arc<ByteBuf>>), - SyncChecksums(Vec<RangeChecksum>), - SyncDifferentSet(Vec<SyncRange>), + SyncRPC(SyncRPC), } pub trait PartitionKey { @@ -118,11 +119,15 @@ pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type E: Entry<Self::P, Self::S>; + type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>); + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true } } impl<F: TableSchema + 'static> Table<F> { + // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== + pub async fn new( instance: F, system: Arc<System>, @@ -144,18 +149,10 @@ impl<F: TableSchema + 'static> Table<F> { table } - pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> { - Box::new(TableRpcHandlerAdapter::<F> { table: self }) - } - pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, self.param.replication_factor); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -171,12 +168,8 @@ impl<F: TableSchema + 'static> Table<F> { for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, self.param.replication_factor); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -215,12 +208,8 @@ impl<F: TableSchema + 'static> Table<F> { sort_key: &F::S, ) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); - let who = self - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, self.param.replication_factor); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); eprintln!("get who: {:?}", who); let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -251,15 +240,76 @@ impl<F: TableSchema + 'static> Table<F> { } if let Some(ret_entry) = &ret { if not_all_same { + let self2 = self.clone(); + let ent2 = ret_entry.clone(); self.system .background - .spawn(self.clone().repair_on_read(who, ret_entry.clone())); + .spawn(async move { + self2.repair_on_read(&who[..], ent2).await + }); } } Ok(ret) } - async fn repair_on_read(self: Arc<Self>, who: Vec<UUID>, what: F::E) -> Result<(), Error> { + pub async fn get_range( + self: &Arc<Self>, + partition_key: &F::P, + begin_sort_key: &F::S, + filter: Option<F::Filter>, + limit: usize, + ) -> Result<Vec<F::E>, Error> { + let hash = partition_key.hash(); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); + + let rpc = &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); + let resps = self + .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) + .await?; + + let mut ret = BTreeMap::new(); + let mut to_repair = BTreeMap::new(); + for resp in resps { + if let TableRPC::Update(entries) = resp { + for entry_bytes in entries.iter() { + let entry = rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; + let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + match ret.remove(&entry_key) { + None => { + ret.insert(entry_key, Some(entry)); + } + Some(Some(mut prev)) => { + let must_repair = prev != entry; + prev.merge(&entry); + if must_repair { + to_repair.insert(entry_key.clone(), Some(prev.clone())); + } + ret.insert(entry_key, Some(prev)); + } + Some(None) => unreachable!(), + } + } + } + } + if !to_repair.is_empty() { + let self2 = self.clone(); + self.system + .background + .spawn(async move { + for (_, v) in to_repair.iter_mut() { + self2.repair_on_read(&who[..], v.take().unwrap()).await?; + } + Ok(()) + }); + } + let ret_vec = ret.iter_mut().take(limit).map(|(_k, v)| v.take().unwrap()).collect::<Vec<_>>(); + Ok(ret_vec) + } + + // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== + + async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len()) .await?; @@ -322,6 +372,12 @@ impl<F: TableSchema + 'static> Table<F> { ))) } + // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== + + pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> { + Box::new(TableRpcHandlerAdapter::<F> { table: self }) + } + async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { @@ -332,12 +388,12 @@ impl<F: TableSchema + 'static> Table<F> { self.handle_update(pairs).await?; Ok(TableRPC::Ok) } - TableRPC::SyncChecksums(checksums) => { + TableRPC::SyncRPC(rpc) => { let syncer = self.syncer.read().await.as_ref().unwrap().clone(); - let differing = syncer - .handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()) + let response = syncer + .handle_rpc(&rpc, self.system.background.stop_signal.clone()) .await?; - Ok(TableRPC::SyncDifferentSet(differing)) + Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::RPCError(format!("Unexpected table RPC"))), } |