From 69f1d8fef23149e45189c296e0c0d23e040cbb0e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 17 Apr 2020 17:09:57 +0200 Subject: WIP TODOs: - ensure sync goes both way - finish sending blocks to other nodes when they need them before deleting --- src/table.rs | 118 +++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 31 deletions(-) (limited to 'src/table.rs') diff --git a/src/table.rs b/src/table.rs index 6892c9f5..40114aec 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, BTreeMap}; use std::sync::Arc; use std::time::Duration; @@ -60,10 +60,11 @@ pub enum TableRPC { ReadEntry(F::P, F::S), ReadEntryResponse(Option), + ReadRange(F::P, F::S, Option, usize), + Update(Vec>), - SyncChecksums(Vec), - SyncDifferentSet(Vec), + SyncRPC(SyncRPC), } pub trait PartitionKey { @@ -118,11 +119,15 @@ pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type E: Entry; + type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; async fn updated(&self, old: Option, new: Option); + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true } } impl Table { + // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== + pub async fn new( instance: F, system: Arc, @@ -144,18 +149,10 @@ impl Table { table } - pub fn rpc_handler(self: Arc) -> Box { - Box::new(TableRpcHandlerAdapter:: { table: self }) - } - pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, self.param.replication_factor); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -171,12 +168,8 @@ impl Table { for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, self.param.replication_factor); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -215,12 +208,8 @@ impl Table { sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, self.param.replication_factor); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); eprintln!("get who: {:?}", who); let rpc = &TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -251,15 +240,76 @@ impl Table { } if let Some(ret_entry) = &ret { if not_all_same { + let self2 = self.clone(); + let ent2 = ret_entry.clone(); self.system .background - .spawn(self.clone().repair_on_read(who, ret_entry.clone())); + .spawn(async move { + self2.repair_on_read(&who[..], ent2).await + }); } } Ok(ret) } - async fn repair_on_read(self: Arc, who: Vec, what: F::E) -> Result<(), Error> { + pub async fn get_range( + self: &Arc, + partition_key: &F::P, + begin_sort_key: &F::S, + filter: Option, + limit: usize, + ) -> Result, Error> { + let hash = partition_key.hash(); + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.param.replication_factor); + + let rpc = &TableRPC::::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); + let resps = self + .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) + .await?; + + let mut ret = BTreeMap::new(); + let mut to_repair = BTreeMap::new(); + for resp in resps { + if let TableRPC::Update(entries) = resp { + for entry_bytes in entries.iter() { + let entry = rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; + let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + match ret.remove(&entry_key) { + None => { + ret.insert(entry_key, Some(entry)); + } + Some(Some(mut prev)) => { + let must_repair = prev != entry; + prev.merge(&entry); + if must_repair { + to_repair.insert(entry_key.clone(), Some(prev.clone())); + } + ret.insert(entry_key, Some(prev)); + } + Some(None) => unreachable!(), + } + } + } + } + if !to_repair.is_empty() { + let self2 = self.clone(); + self.system + .background + .spawn(async move { + for (_, v) in to_repair.iter_mut() { + self2.repair_on_read(&who[..], v.take().unwrap()).await?; + } + Ok(()) + }); + } + let ret_vec = ret.iter_mut().take(limit).map(|(_k, v)| v.take().unwrap()).collect::>(); + Ok(ret_vec) + } + + // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== + + async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.rpc_try_call_many(&who[..], &TableRPC::::Update(vec![what_enc]), who.len()) .await?; @@ -322,6 +372,12 @@ impl Table { ))) } + // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== + + pub fn rpc_handler(self: Arc) -> Box { + Box::new(TableRpcHandlerAdapter:: { table: self }) + } + async fn handle(self: &Arc, msg: TableRPC) -> Result, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { @@ -332,12 +388,12 @@ impl Table { self.handle_update(pairs).await?; Ok(TableRPC::Ok) } - TableRPC::SyncChecksums(checksums) => { + TableRPC::SyncRPC(rpc) => { let syncer = self.syncer.read().await.as_ref().unwrap().clone(); - let differing = syncer - .handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()) + let response = syncer + .handle_rpc(&rpc, self.system.background.stop_signal.clone()) .await?; - Ok(TableRPC::SyncDifferentSet(differing)) + Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::RPCError(format!("Unexpected table RPC"))), } -- cgit v1.2.3