diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 524 |
1 files changed, 524 insertions, 0 deletions
diff --git a/src/table/table.rs b/src/table/table.rs new file mode 100644 index 00000000..50e8739a --- /dev/null +++ b/src/table/table.rs @@ -0,0 +1,524 @@ +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; +use std::time::Duration; + +use arc_swap::ArcSwapOption; +use async_trait::async_trait; +use futures::stream::*; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; + +use crate::data::*; +use crate::error::Error; + +use crate::rpc::membership::{Ring, System}; +use crate::rpc::rpc_client::*; +use crate::rpc::rpc_server::*; + +use crate::table::table_sync::*; + +const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); + +pub struct Table<F: TableSchema, R: TableReplication> { + pub instance: F, + pub replication: R, + + pub name: String, + pub rpc_client: Arc<RpcClient<TableRPC<F>>>, + + pub system: Arc<System>, + pub store: sled::Tree, + pub syncer: ArcSwapOption<TableSyncer<F, R>>, +} + +#[derive(Serialize, Deserialize)] +pub enum TableRPC<F: TableSchema> { + Ok, + + ReadEntry(F::P, F::S), + ReadEntryResponse(Option<ByteBuf>), + + // Read range: read all keys in partition P, possibly starting at a certain sort key offset + ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize), + + Update(Vec<Arc<ByteBuf>>), + + SyncRPC(SyncRPC), +} + +impl<F: TableSchema> RpcMessage for TableRPC<F> {} + +pub trait PartitionKey { + fn hash(&self) -> Hash; +} + +pub trait SortKey { + fn sort_key(&self) -> &[u8]; +} + +pub trait Entry<P: PartitionKey, S: SortKey>: + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync +{ + fn partition_key(&self) -> &P; + fn sort_key(&self) -> &S; + + fn merge(&mut self, other: &Self); +} + +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct EmptyKey; +impl SortKey for EmptyKey { + fn sort_key(&self) -> &[u8] { + &[] + } +} +impl PartitionKey for EmptyKey { + fn hash(&self) -> Hash { + [0u8; 32].into() + } +} + +impl<T: AsRef<str>> PartitionKey for T { + fn hash(&self) -> Hash { + hash(self.as_ref().as_bytes()) + } +} +impl<T: AsRef<str>> SortKey for T { + fn sort_key(&self) -> &[u8] { + self.as_ref().as_bytes() + } +} + +impl PartitionKey for Hash { + fn hash(&self) -> Hash { + self.clone() + } +} +impl SortKey for Hash { + fn sort_key(&self) -> &[u8] { + self.as_slice() + } +} + +#[async_trait] +pub trait TableSchema: Send + Sync { + type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type E: Entry<Self::P, Self::S>; + type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; + + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>; + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { + true + } +} + +pub trait TableReplication: Send + Sync { + // See examples in table_sharded.rs and table_fullcopy.rs + // To understand various replication methods + + // Which nodes to send reads from + fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; + fn read_quorum(&self) -> usize; + + // Which nodes to send writes to + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; + fn write_quorum(&self) -> usize; + fn max_write_errors(&self) -> usize; + fn epidemic_writes(&self) -> bool; + + // Which are the nodes that do actually replicate the data + fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>; + fn split_points(&self, ring: &Ring) -> Vec<Hash>; +} + +impl<F, R> Table<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== + + pub async fn new( + instance: F, + replication: R, + system: Arc<System>, + db: &sled::Db, + name: String, + rpc_server: &mut RpcServer, + ) -> Arc<Self> { + let store = db.open_tree(&name).expect("Unable to open DB tree"); + + let rpc_path = format!("table_{}", name); + let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); + + let table = Arc::new(Self { + instance, + replication, + name, + rpc_client, + system, + store, + syncer: ArcSwapOption::from(None), + }); + table.clone().register_handler(rpc_server, rpc_path); + + let syncer = TableSyncer::launch(table.clone()).await; + table.syncer.swap(Some(syncer)); + + table + } + + pub async fn insert(&self, e: &F::E) -> Result<(), Error> { + let hash = e.partition_key().hash(); + let who = self.replication.write_nodes(&hash, &self.system); + //eprintln!("insert who: {:?}", who); + + let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); + let rpc = TableRPC::<F>::Update(vec![e_enc]); + + self.rpc_client + .try_call_many( + &who[..], + rpc, + RequestStrategy::with_quorum(self.replication.write_quorum()) + .with_timeout(TABLE_RPC_TIMEOUT), + ) + .await?; + Ok(()) + } + + pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + let mut call_list = HashMap::new(); + + for entry in entries.iter() { + let hash = entry.partition_key().hash(); + let who = self.replication.write_nodes(&hash, &self.system); + let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); + for node in who { + if !call_list.contains_key(&node) { + call_list.insert(node, vec![]); + } + call_list.get_mut(&node).unwrap().push(e_enc.clone()); + } + } + + let call_futures = call_list.drain().map(|(node, entries)| async move { + let rpc = TableRPC::<F>::Update(entries); + + let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + Ok::<_, Error>((node, resp)) + }); + let mut resps = call_futures.collect::<FuturesUnordered<_>>(); + let mut errors = vec![]; + + while let Some(resp) = resps.next().await { + if let Err(e) = resp { + errors.push(e); + } + } + if errors.len() > self.replication.max_write_errors() { + Err(Error::Message("Too many errors".into())) + } else { + Ok(()) + } + } + + pub async fn get( + self: &Arc<Self>, + partition_key: &F::P, + sort_key: &F::S, + ) -> Result<Option<F::E>, Error> { + let hash = partition_key.hash(); + let who = self.replication.read_nodes(&hash, &self.system); + //eprintln!("get who: {:?}", who); + + let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); + let resps = self + .rpc_client + .try_call_many( + &who[..], + rpc, + RequestStrategy::with_quorum(self.replication.read_quorum()) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + + let mut ret = None; + let mut not_all_same = false; + for resp in resps { + if let TableRPC::ReadEntryResponse(value) = resp { + if let Some(v_bytes) = value { + let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?; + ret = match ret { + None => Some(v), + Some(mut x) => { + if x != v { + not_all_same = true; + x.merge(&v); + } + Some(x) + } + } + } + } else { + return Err(Error::Message(format!("Invalid return value to read"))); + } + } + if let Some(ret_entry) = &ret { + if not_all_same { + let self2 = self.clone(); + let ent2 = ret_entry.clone(); + self.system + .background + .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); + } + } + Ok(ret) + } + + pub async fn get_range( + self: &Arc<Self>, + partition_key: &F::P, + begin_sort_key: Option<F::S>, + filter: Option<F::Filter>, + limit: usize, + ) -> Result<Vec<F::E>, Error> { + let hash = partition_key.hash(); + let who = self.replication.read_nodes(&hash, &self.system); + + let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + + let resps = self + .rpc_client + .try_call_many( + &who[..], + rpc, + RequestStrategy::with_quorum(self.replication.read_quorum()) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + + let mut ret = BTreeMap::new(); + let mut to_repair = BTreeMap::new(); + for resp in resps { + if let TableRPC::Update(entries) = resp { + for entry_bytes in entries.iter() { + let entry = + rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; + let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + match ret.remove(&entry_key) { + None => { + ret.insert(entry_key, Some(entry)); + } + Some(Some(mut prev)) => { + let must_repair = prev != entry; + prev.merge(&entry); + if must_repair { + to_repair.insert(entry_key.clone(), Some(prev.clone())); + } + ret.insert(entry_key, Some(prev)); + } + Some(None) => unreachable!(), + } + } + } + } + if !to_repair.is_empty() { + let self2 = self.clone(); + self.system.background.spawn_cancellable(async move { + for (_, v) in to_repair.iter_mut() { + self2.repair_on_read(&who[..], v.take().unwrap()).await?; + } + Ok(()) + }); + } + let ret_vec = ret + .iter_mut() + .take(limit) + .map(|(_k, v)| v.take().unwrap()) + .collect::<Vec<_>>(); + Ok(ret_vec) + } + + // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== + + async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { + let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); + self.rpc_client + .try_call_many( + &who[..], + TableRPC::<F>::Update(vec![what_enc]), + RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT), + ) + .await?; + Ok(()) + } + + // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== + + fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { + let self2 = self.clone(); + rpc_server.add_handler::<TableRPC<F>, _, _>(path, move |msg, _addr| { + let self2 = self2.clone(); + async move { self2.handle(&msg).await } + }); + + let self2 = self.clone(); + self.rpc_client + .set_local_handler(self.system.id, move |msg| { + let self2 = self2.clone(); + async move { self2.handle(&msg).await } + }); + } + + async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> { + match msg { + TableRPC::ReadEntry(key, sort_key) => { + let value = self.handle_read_entry(key, sort_key)?; + Ok(TableRPC::ReadEntryResponse(value)) + } + TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { + let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; + Ok(TableRPC::Update(values)) + } + TableRPC::Update(pairs) => { + self.handle_update(pairs).await?; + Ok(TableRPC::Ok) + } + TableRPC::SyncRPC(rpc) => { + let syncer = self.syncer.load_full().unwrap(); + let response = syncer + .handle_rpc(rpc, self.system.background.stop_signal.clone()) + .await?; + Ok(TableRPC::SyncRPC(response)) + } + _ => Err(Error::BadRequest(format!("Unexpected table RPC"))), + } + } + + fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { + let tree_key = self.tree_key(p, s); + if let Some(bytes) = self.store.get(&tree_key)? { + Ok(Some(ByteBuf::from(bytes.to_vec()))) + } else { + Ok(None) + } + } + + fn handle_read_range( + &self, + p: &F::P, + s: &Option<F::S>, + filter: &Option<F::Filter>, + limit: usize, + ) -> Result<Vec<Arc<ByteBuf>>, Error> { + let partition_hash = p.hash(); + let first_key = match s { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(p, sk), + }; + let mut ret = vec![]; + for item in self.store.range(first_key..) { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let keep = match filter { + None => true, + Some(f) => { + let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?; + F::matches_filter(&entry, f) + } + }; + if keep { + ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + } + if ret.len() >= limit { + break; + } + } + Ok(ret) + } + + pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { + let syncer = self.syncer.load_full().unwrap(); + let mut epidemic_propagate = vec![]; + + for update_bytes in entries.iter() { + let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; + + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let (old_entry, new_entry) = self.store.transaction(|db| { + let (old_entry, new_entry) = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes) + .map_err(Error::RMPDecode) + .map_err(sled::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) + .map_err(sled::ConflictableTransactionError::Abort)?; + db.insert(tree_key.clone(), new_bytes)?; + Ok((old_entry, new_entry)) + })?; + + if old_entry.as_ref() != Some(&new_entry) { + if self.replication.epidemic_writes() { + epidemic_propagate.push(new_entry.clone()); + } + + self.instance.updated(old_entry, Some(new_entry)).await?; + self.system + .background + .spawn_cancellable(syncer.clone().invalidate(tree_key)); + } + } + + if epidemic_propagate.len() > 0 { + let self2 = self.clone(); + self.system + .background + .spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await }); + } + + Ok(()) + } + + pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { + let syncer = self.syncer.load_full().unwrap(); + + debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end); + let mut count = 0; + while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { + if key.as_ref() < begin.as_slice() { + break; + } + if let Some(old_val) = self.store.remove(&key)? { + let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?; + self.instance.updated(Some(old_entry), None).await?; + self.system + .background + .spawn_cancellable(syncer.clone().invalidate(key.to_vec())); + count += 1; + } + } + debug!("({}) {} entries deleted", self.name, count); + Ok(()) + } + + fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { + let mut ret = p.hash().to_vec(); + ret.extend(s.sort_key()); + ret + } +} |