use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use futures::stream::*; use tokio::sync::RwLock; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; use crate::membership::System; use crate::proto::*; use crate::rpc_client::*; use crate::table_sync::*; pub struct Table { pub instance: F, pub name: String, pub system: Arc, pub store: sled::Tree, pub syncer: RwLock>>>, pub param: TableReplicationParams, } #[derive(Clone)] pub struct TableReplicationParams { pub replication_factor: usize, pub read_quorum: usize, pub write_quorum: usize, pub timeout: Duration, } #[async_trait] pub trait TableRpcHandler { async fn handle(&self, rpc: &[u8]) -> Result, Error>; } struct TableRpcHandlerAdapter { table: Arc>, } #[async_trait] impl TableRpcHandler for TableRpcHandlerAdapter { async fn handle(&self, rpc: &[u8]) -> Result, Error> { let msg = rmp_serde::decode::from_read_ref::<_, TableRPC>(rpc)?; let rep = self.table.handle(msg).await?; Ok(rmp_to_vec_all_named(&rep)?) } } #[derive(Serialize, Deserialize)] pub enum TableRPC { Ok, ReadEntry(F::P, F::S), ReadEntryResponse(Option), Update(Vec>), SyncChecksums(Vec), SyncDifferentSet(Vec), } pub trait PartitionKey { fn hash(&self) -> Hash; } pub trait SortKey { fn sort_key(&self) -> &[u8]; } pub trait Entry: PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; fn merge(&mut self, other: &Self); } #[derive(Clone, Serialize, Deserialize)] pub struct EmptySortKey; impl SortKey for EmptySortKey { fn sort_key(&self) -> &[u8] { &[] } } impl> PartitionKey for T { fn hash(&self) -> Hash { hash(self.as_ref().as_bytes()) } } impl> SortKey for T { fn sort_key(&self) -> &[u8] { self.as_ref().as_bytes() } } impl PartitionKey for Hash { fn hash(&self) -> Hash { self.clone() } } impl SortKey for Hash { fn sort_key(&self) -> &[u8] { self.as_slice() } } #[async_trait] pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type E: Entry; async fn updated(&self, old: Option, new: Self::E); } impl Table { pub async fn new( instance: F, system: Arc, db: &sled::Db, name: String, param: TableReplicationParams, ) -> Arc { let store = db.open_tree(&name).expect("Unable to open DB tree"); let table = Arc::new(Self { instance, name, system, store, param, syncer: RwLock::new(None), }); let syncer = TableSyncer::launch(table.clone()).await; *table.syncer.write().await = Some(syncer); table } pub fn rpc_handler(self: Arc) -> Box { Box::new(TableRpcHandlerAdapter:: { table: self }) } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); let who = self .system .ring .borrow() .clone() .walk_ring(&hash, self.param.replication_factor); eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = &TableRPC::::Update(vec![e_enc]); self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum) .await?; Ok(()) } pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { let mut call_list = HashMap::new(); for entry in entries.iter() { let hash = entry.partition_key().hash(); let who = self .system .ring .borrow() .clone() .walk_ring(&hash, self.param.replication_factor); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { call_list.insert(node.clone(), vec![]); } call_list.get_mut(&node).unwrap().push(e_enc.clone()); } } let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::::Update(entries); let rpc_bytes = rmp_to_vec_all_named(&rpc)?; let message = Message::TableRPC(self.name.to_string(), rpc_bytes); let resp = rpc_call(self.system.clone(), &node, &message, self.param.timeout).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::>(); let mut errors = vec![]; while let Some(resp) = resps.next().await { if let Err(e) = resp { errors.push(e); } } if errors.len() > self.param.replication_factor - self.param.write_quorum { Err(Error::Message("Too many errors".into())) } else { Ok(()) } } pub async fn get( self: &Arc, partition_key: &F::P, sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); let who = self .system .ring .borrow() .clone() .walk_ring(&hash, self.param.replication_factor); eprintln!("get who: {:?}", who); let rpc = &TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) .await?; let mut ret = None; let mut not_all_same = false; for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { if x != v { not_all_same = true; x.merge(&v); } Some(x) } } } } else { return Err(Error::Message(format!("Invalid return value to read"))); } } if let Some(ret_entry) = &ret { if not_all_same { self.system .background .spawn(self.clone().repair_on_read(who, ret_entry.clone())); } } Ok(ret) } async fn repair_on_read(self: Arc, who: Vec, what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.rpc_try_call_many(&who[..], &TableRPC::::Update(vec![what_enc]), who.len()) .await?; Ok(()) } async fn rpc_try_call_many( &self, who: &[UUID], rpc: &TableRPC, quorum: usize, ) -> Result>, Error> { eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); let rpc_bytes = rmp_to_vec_all_named(rpc)?; let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); let resps = rpc_try_call_many( self.system.clone(), who, rpc_msg, quorum, self.param.timeout, ) .await?; let mut resps_vals = vec![]; for resp in resps { if let Message::TableRPC(tbl, rep_by) = &resp { if *tbl == self.name { resps_vals.push(rmp_serde::decode::from_read_ref(&rep_by)?); continue; } } return Err(Error::Message(format!( "Invalid reply to TableRPC: {:?}", resp ))); } eprintln!( "Table RPC responses: {}", serde_json::to_string(&resps_vals)? ); Ok(resps_vals) } pub async fn rpc_call(&self, who: &UUID, rpc: &TableRPC) -> Result, Error> { let rpc_bytes = rmp_to_vec_all_named(rpc)?; let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); let resp = rpc_call(self.system.clone(), who, &rpc_msg, self.param.timeout).await?; if let Message::TableRPC(tbl, rep_by) = &resp { if *tbl == self.name { return Ok(rmp_serde::decode::from_read_ref(&rep_by)?); } } Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) } async fn handle(self: &Arc, msg: TableRPC) -> Result, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { let value = self.handle_read_entry(&key, &sort_key)?; Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::Update(pairs) => { self.handle_update(pairs).await?; Ok(TableRPC::Ok) } TableRPC::SyncChecksums(checksums) => { let syncer = self.syncer.read().await.as_ref().unwrap().clone(); let differing = syncer.handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()).await?; Ok(TableRPC::SyncDifferentSet(differing)) } _ => Err(Error::RPCError(format!("Unexpected table RPC"))), } } fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result, Error> { let tree_key = self.tree_key(p, s); if let Some(bytes) = self.store.get(&tree_key)? { Ok(Some(ByteBuf::from(bytes.to_vec()))) } else { Ok(None) } } async fn handle_update(self: &Arc, mut entries: Vec>) -> Result<(), Error> { for update_bytes in entries.drain(..) { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let (old_entry, new_entry) = self.store.transaction(|db| { let (old_entry, new_entry) = match db.get(&tree_key)? { Some(prev_bytes) => { let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes) .map_err(Error::RMPDecode) .map_err(sled::ConflictableTransactionError::Abort)?; let mut new_entry = old_entry.clone(); new_entry.merge(&update); (Some(old_entry), new_entry) } None => (None, update.clone()), }; let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RMPEncode) .map_err(sled::ConflictableTransactionError::Abort)?; db.insert(tree_key.clone(), new_bytes)?; Ok((old_entry, new_entry)) })?; if old_entry.as_ref() != Some(&new_entry) { self.instance.updated(old_entry, new_entry).await; let syncer = self.syncer.read().await.as_ref().unwrap().clone(); self.system.background.spawn(syncer.invalidate(tree_key)); } } Ok(()) } pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end); // TODO Ok(()) } fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); ret } }