diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 101 |
1 files changed, 80 insertions, 21 deletions
diff --git a/src/table.rs b/src/table.rs index 36c279e8..9f5eca33 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,8 +1,12 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; +use futures::stream::*; + use crate::data::*; use crate::error::Error; use crate::membership::System; @@ -52,9 +56,9 @@ pub enum TableRPC<F: TableFormat> { Ok, ReadEntry(F::P, F::S), - ReadEntryResponse(Option<F::E>), + ReadEntryResponse(Option<ByteBuf>), - Update(Vec<F::E>), + Update(Vec<Arc<ByteBuf>>), } pub struct Partition { @@ -116,7 +120,7 @@ pub trait TableFormat: Send + Sync { type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type E: Entry<Self::P, Self::S>; - async fn updated(&self, old: Option<&Self::E>, new: &Self::E); + async fn updated(&self, old: Option<Self::E>, new: Self::E); } impl<F: TableFormat + 'static> Table<F> { @@ -152,13 +156,63 @@ impl<F: TableFormat + 'static> Table<F> { .walk_ring(&hash, self.param.replication_factor); eprintln!("insert who: {:?}", who); - let rpc = &TableRPC::<F>::Update(vec![e.clone()]); + let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); + let rpc = &TableRPC::<F>::Update(vec![e_enc]); self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum) .await?; Ok(()) } + pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + let mut call_list = HashMap::new(); + + for entry in entries.iter() { + let hash = entry.partition_key().hash(); + let who = self + .system + .members + .read() + .await + .walk_ring(&hash, self.param.replication_factor); + let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); + for node in who { + if !call_list.contains_key(&node) { + call_list.insert(node.clone(), vec![]); + } + call_list.get_mut(&node).unwrap().push(e_enc.clone()); + } + } + + let call_futures = call_list.drain() + .map(|(node, entries)| async move { + let rpc = TableRPC::<F>::Update(entries); + let rpc_bytes = rmp_to_vec_all_named(&rpc)?; + let message = Message::TableRPC(self.name.to_string(), rpc_bytes); + + let resp = rpc_call( + self.system.clone(), + &node, + &message, + self.param.timeout + ).await?; + Ok::<_, Error>((node, resp)) + }); + let mut resps = call_futures.collect::<FuturesUnordered<_>>(); + let mut errors = vec![]; + + while let Some(resp) = resps.next().await { + if let Err(e) = resp { + errors.push(e); + } + } + if errors.len() > self.param.replication_factor - self.param.write_quorum { + Err(Error::Message("Too many errors".into())) + } else { + Ok(()) + } + } + pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); let who = self @@ -178,7 +232,8 @@ impl<F: TableFormat + 'static> Table<F> { let mut not_all_same = false; for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { - if let Some(v) = value { + if let Some(v_bytes) = value { + let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -196,19 +251,22 @@ impl<F: TableFormat + 'static> Table<F> { } if let Some(ret_entry) = &ret { if not_all_same { - // Repair on read - let _: Result<_, _> = self - .rpc_try_call_many( - &who[..], - &TableRPC::<F>::Update(vec![ret_entry.clone()]), - who.len(), - ) - .await; + let _: Result<_, _> = self.repair_on_read(&who[..], &ret_entry).await; } } Ok(ret) } + async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> { + let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?)); + self.rpc_try_call_many(&who[..], + &TableRPC::<F>::Update(vec![what_enc]), + who.len(), + ) + .await + .map(|_|()) + } + async fn rpc_try_call_many( &self, who: &[UUID], @@ -263,18 +321,19 @@ impl<F: TableFormat + 'static> Table<F> { } } - fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<F::E>, Error> { + fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { let tree_key = self.tree_key(p, s); if let Some(bytes) = self.store.get(&tree_key)? { - let e = rmp_serde::decode::from_read_ref::<_, F::E>(&bytes)?; - Ok(Some(e)) + Ok(Some(ByteBuf::from(bytes.to_vec()))) } else { Ok(None) } } - async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> { - for update in entries.drain(..) { + async fn handle_update(&self, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> { + for update_bytes in entries.drain(..) { + let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let (old_entry, new_entry) = self.store.transaction(|db| { @@ -297,7 +356,7 @@ impl<F: TableFormat + 'static> Table<F> { Ok((old_entry, new_entry)) })?; - self.instance.updated(old_entry.as_ref(), &new_entry).await; + self.instance.updated(old_entry, new_entry).await; } Ok(()) } |