From c9c6b0dbd41e20d19b91c6615c46da6f45925bca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 23 Apr 2020 17:05:46 +0000 Subject: Reorganize code --- src/table.rs | 522 ----------------------------------------------------------- 1 file changed, 522 deletions(-) delete mode 100644 src/table.rs (limited to 'src/table.rs') diff --git a/src/table.rs b/src/table.rs deleted file mode 100644 index a3d02d0c..00000000 --- a/src/table.rs +++ /dev/null @@ -1,522 +0,0 @@ -use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; -use std::time::Duration; - -use arc_swap::ArcSwapOption; -use async_trait::async_trait; -use futures::stream::*; -use serde::{Deserialize, Serialize}; -use serde_bytes::ByteBuf; - -use crate::data::*; -use crate::error::Error; -use crate::membership::{Ring, System}; -use crate::rpc_client::*; -use crate::rpc_server::*; -use crate::table_sync::*; - -const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); - -pub struct Table { - pub instance: F, - pub replication: R, - - pub name: String, - pub rpc_client: Arc>>, - - pub system: Arc, - pub store: sled::Tree, - pub syncer: ArcSwapOption>, -} - -#[derive(Serialize, Deserialize)] -pub enum TableRPC { - Ok, - - ReadEntry(F::P, F::S), - ReadEntryResponse(Option), - - // Read range: read all keys in partition P, possibly starting at a certain sort key offset - ReadRange(F::P, Option, Option, usize), - - Update(Vec>), - - SyncRPC(SyncRPC), -} - -impl RpcMessage for TableRPC {} - -pub trait PartitionKey { - fn hash(&self) -> Hash; -} - -pub trait SortKey { - fn sort_key(&self) -> &[u8]; -} - -pub trait Entry: - PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync -{ - fn partition_key(&self) -> &P; - fn sort_key(&self) -> &S; - - fn merge(&mut self, other: &Self); -} - -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct EmptyKey; -impl SortKey for EmptyKey { - fn sort_key(&self) -> &[u8] { - &[] - } -} -impl PartitionKey for EmptyKey { - fn hash(&self) -> Hash { - [0u8; 32].into() - } -} - -impl> PartitionKey for T { - fn hash(&self) -> Hash { - hash(self.as_ref().as_bytes()) - } -} -impl> SortKey for T { - fn sort_key(&self) -> &[u8] { - self.as_ref().as_bytes() - } -} - -impl PartitionKey for Hash { - fn hash(&self) -> Hash { - self.clone() - } -} -impl SortKey for Hash { - fn sort_key(&self) -> &[u8] { - self.as_slice() - } -} - -#[async_trait] -pub trait TableSchema: Send + Sync { - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; - type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; - type E: Entry; - type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; - - async fn updated(&self, old: Option, new: Option) -> Result<(), Error>; - fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { - true - } -} - -pub trait TableReplication: Send + Sync { - // See examples in table_sharded.rs and table_fullcopy.rs - // To understand various replication methods - - // Which nodes to send reads from - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn read_quorum(&self) -> usize; - - // Which nodes to send writes to - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn write_quorum(&self) -> usize; - fn max_write_errors(&self) -> usize; - fn epidemic_writes(&self) -> bool; - - // Which are the nodes that do actually replicate the data - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec; - fn split_points(&self, ring: &Ring) -> Vec; -} - -impl Table -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - - pub async fn new( - instance: F, - replication: R, - system: Arc, - db: &sled::Db, - name: String, - rpc_server: &mut RpcServer, - ) -> Arc { - let store = db.open_tree(&name).expect("Unable to open DB tree"); - - let rpc_path = format!("table_{}", name); - let rpc_client = system.rpc_client::>(&rpc_path); - - let table = Arc::new(Self { - instance, - replication, - name, - rpc_client, - system, - store, - syncer: ArcSwapOption::from(None), - }); - table.clone().register_handler(rpc_server, rpc_path); - - let syncer = TableSyncer::launch(table.clone()).await; - table.syncer.swap(Some(syncer)); - - table - } - - pub async fn insert(&self, e: &F::E) -> Result<(), Error> { - let hash = e.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); - //eprintln!("insert who: {:?}", who); - - let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); - let rpc = TableRPC::::Update(vec![e_enc]); - - self.rpc_client - .try_call_many( - &who[..], - rpc, - RequestStrategy::with_quorum(self.replication.write_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT), - ) - .await?; - Ok(()) - } - - pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { - let mut call_list = HashMap::new(); - - for entry in entries.iter() { - let hash = entry.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); - let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); - for node in who { - if !call_list.contains_key(&node) { - call_list.insert(node, vec![]); - } - call_list.get_mut(&node).unwrap().push(e_enc.clone()); - } - } - - let call_futures = call_list.drain().map(|(node, entries)| async move { - let rpc = TableRPC::::Update(entries); - - let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; - Ok::<_, Error>((node, resp)) - }); - let mut resps = call_futures.collect::>(); - let mut errors = vec![]; - - while let Some(resp) = resps.next().await { - if let Err(e) = resp { - errors.push(e); - } - } - if errors.len() > self.replication.max_write_errors() { - Err(Error::Message("Too many errors".into())) - } else { - Ok(()) - } - } - - pub async fn get( - self: &Arc, - partition_key: &F::P, - sort_key: &F::S, - ) -> Result, Error> { - let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); - //eprintln!("get who: {:?}", who); - - let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); - let resps = self - .rpc_client - .try_call_many( - &who[..], - rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT) - .interrupt_after_quorum(true), - ) - .await?; - - let mut ret = None; - let mut not_all_same = false; - for resp in resps { - if let TableRPC::ReadEntryResponse(value) = resp { - if let Some(v_bytes) = value { - let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?; - ret = match ret { - None => Some(v), - Some(mut x) => { - if x != v { - not_all_same = true; - x.merge(&v); - } - Some(x) - } - } - } - } else { - return Err(Error::Message(format!("Invalid return value to read"))); - } - } - if let Some(ret_entry) = &ret { - if not_all_same { - let self2 = self.clone(); - let ent2 = ret_entry.clone(); - self.system - .background - .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); - } - } - Ok(ret) - } - - pub async fn get_range( - self: &Arc, - partition_key: &F::P, - begin_sort_key: Option, - filter: Option, - limit: usize, - ) -> Result, Error> { - let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); - - let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); - - let resps = self - .rpc_client - .try_call_many( - &who[..], - rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) - .with_timeout(TABLE_RPC_TIMEOUT) - .interrupt_after_quorum(true), - ) - .await?; - - let mut ret = BTreeMap::new(); - let mut to_repair = BTreeMap::new(); - for resp in resps { - if let TableRPC::Update(entries) = resp { - for entry_bytes in entries.iter() { - let entry = - rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; - let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); - match ret.remove(&entry_key) { - None => { - ret.insert(entry_key, Some(entry)); - } - Some(Some(mut prev)) => { - let must_repair = prev != entry; - prev.merge(&entry); - if must_repair { - to_repair.insert(entry_key.clone(), Some(prev.clone())); - } - ret.insert(entry_key, Some(prev)); - } - Some(None) => unreachable!(), - } - } - } - } - if !to_repair.is_empty() { - let self2 = self.clone(); - self.system.background.spawn_cancellable(async move { - for (_, v) in to_repair.iter_mut() { - self2.repair_on_read(&who[..], v.take().unwrap()).await?; - } - Ok(()) - }); - } - let ret_vec = ret - .iter_mut() - .take(limit) - .map(|(_k, v)| v.take().unwrap()) - .collect::>(); - Ok(ret_vec) - } - - // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== - - async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { - let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.rpc_client - .try_call_many( - &who[..], - TableRPC::::Update(vec![what_enc]), - RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT), - ) - .await?; - Ok(()) - } - - // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== - - fn register_handler(self: Arc, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::, _, _>(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); - - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); - } - - async fn handle(self: &Arc, msg: &TableRPC) -> Result, Error> { - match msg { - TableRPC::ReadEntry(key, sort_key) => { - let value = self.handle_read_entry(key, sort_key)?; - Ok(TableRPC::ReadEntryResponse(value)) - } - TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; - Ok(TableRPC::Update(values)) - } - TableRPC::Update(pairs) => { - self.handle_update(pairs).await?; - Ok(TableRPC::Ok) - } - TableRPC::SyncRPC(rpc) => { - let syncer = self.syncer.load_full().unwrap(); - let response = syncer - .handle_rpc(rpc, self.system.background.stop_signal.clone()) - .await?; - Ok(TableRPC::SyncRPC(response)) - } - _ => Err(Error::BadRequest(format!("Unexpected table RPC"))), - } - } - - fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result, Error> { - let tree_key = self.tree_key(p, s); - if let Some(bytes) = self.store.get(&tree_key)? { - Ok(Some(ByteBuf::from(bytes.to_vec()))) - } else { - Ok(None) - } - } - - fn handle_read_range( - &self, - p: &F::P, - s: &Option, - filter: &Option, - limit: usize, - ) -> Result>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; - let mut ret = vec![]; - for item in self.store.range(first_key..) { - let (key, value) = item?; - if &key[..32] != partition_hash.as_slice() { - break; - } - let keep = match filter { - None => true, - Some(f) => { - let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?; - F::matches_filter(&entry, f) - } - }; - if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); - } - if ret.len() >= limit { - break; - } - } - Ok(ret) - } - - pub async fn handle_update(self: &Arc, entries: &[Arc]) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); - let mut epidemic_propagate = vec![]; - - for update_bytes in entries.iter() { - let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; - - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let (old_entry, new_entry) = self.store.transaction(|db| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes) - .map_err(Error::RMPDecode) - .map_err(sled::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; - - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) - .map_err(sled::ConflictableTransactionError::Abort)?; - db.insert(tree_key.clone(), new_bytes)?; - Ok((old_entry, new_entry)) - })?; - - if old_entry.as_ref() != Some(&new_entry) { - if self.replication.epidemic_writes() { - epidemic_propagate.push(new_entry.clone()); - } - - self.instance.updated(old_entry, Some(new_entry)).await?; - self.system - .background - .spawn_cancellable(syncer.clone().invalidate(tree_key)); - } - } - - if epidemic_propagate.len() > 0 { - let self2 = self.clone(); - self.system - .background - .spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await }); - } - - Ok(()) - } - - pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); - - debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end); - let mut count = 0; - while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { - if key.as_ref() < begin.as_slice() { - break; - } - if let Some(old_val) = self.store.remove(&key)? { - let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?; - self.instance.updated(Some(old_entry), None).await?; - self.system - .background - .spawn_cancellable(syncer.clone().invalidate(key.to_vec())); - count += 1; - } - } - debug!("({}) {} entries deleted", self.name, count); - Ok(()) - } - - fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { - let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); - ret - } -} -- cgit v1.2.3