aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs101
1 files changed, 80 insertions, 21 deletions
diff --git a/src/table.rs b/src/table.rs
index 36c279e8..9f5eca33 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -1,8 +1,12 @@
-use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+use futures::stream::*;
+
use crate::data::*;
use crate::error::Error;
use crate::membership::System;
@@ -52,9 +56,9 @@ pub enum TableRPC<F: TableFormat> {
Ok,
ReadEntry(F::P, F::S),
- ReadEntryResponse(Option<F::E>),
+ ReadEntryResponse(Option<ByteBuf>),
- Update(Vec<F::E>),
+ Update(Vec<Arc<ByteBuf>>),
}
pub struct Partition {
@@ -116,7 +120,7 @@ pub trait TableFormat: Send + Sync {
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type E: Entry<Self::P, Self::S>;
- async fn updated(&self, old: Option<&Self::E>, new: &Self::E);
+ async fn updated(&self, old: Option<Self::E>, new: Self::E);
}
impl<F: TableFormat + 'static> Table<F> {
@@ -152,13 +156,63 @@ impl<F: TableFormat + 'static> Table<F> {
.walk_ring(&hash, self.param.replication_factor);
eprintln!("insert who: {:?}", who);
- let rpc = &TableRPC::<F>::Update(vec![e.clone()]);
+ let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
+ let rpc = &TableRPC::<F>::Update(vec![e_enc]);
self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum)
.await?;
Ok(())
}
+ pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
+ let mut call_list = HashMap::new();
+
+ for entry in entries.iter() {
+ let hash = entry.partition_key().hash();
+ let who = self
+ .system
+ .members
+ .read()
+ .await
+ .walk_ring(&hash, self.param.replication_factor);
+ let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
+ for node in who {
+ if !call_list.contains_key(&node) {
+ call_list.insert(node.clone(), vec![]);
+ }
+ call_list.get_mut(&node).unwrap().push(e_enc.clone());
+ }
+ }
+
+ let call_futures = call_list.drain()
+ .map(|(node, entries)| async move {
+ let rpc = TableRPC::<F>::Update(entries);
+ let rpc_bytes = rmp_to_vec_all_named(&rpc)?;
+ let message = Message::TableRPC(self.name.to_string(), rpc_bytes);
+
+ let resp = rpc_call(
+ self.system.clone(),
+ &node,
+ &message,
+ self.param.timeout
+ ).await?;
+ Ok::<_, Error>((node, resp))
+ });
+ let mut resps = call_futures.collect::<FuturesUnordered<_>>();
+ let mut errors = vec![];
+
+ while let Some(resp) = resps.next().await {
+ if let Err(e) = resp {
+ errors.push(e);
+ }
+ }
+ if errors.len() > self.param.replication_factor - self.param.write_quorum {
+ Err(Error::Message("Too many errors".into()))
+ } else {
+ Ok(())
+ }
+ }
+
pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
let who = self
@@ -178,7 +232,8 @@ impl<F: TableFormat + 'static> Table<F> {
let mut not_all_same = false;
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
- if let Some(v) = value {
+ if let Some(v_bytes) = value {
+ let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?;
ret = match ret {
None => Some(v),
Some(mut x) => {
@@ -196,19 +251,22 @@ impl<F: TableFormat + 'static> Table<F> {
}
if let Some(ret_entry) = &ret {
if not_all_same {
- // Repair on read
- let _: Result<_, _> = self
- .rpc_try_call_many(
- &who[..],
- &TableRPC::<F>::Update(vec![ret_entry.clone()]),
- who.len(),
- )
- .await;
+ let _: Result<_, _> = self.repair_on_read(&who[..], &ret_entry).await;
}
}
Ok(ret)
}
+ async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> {
+ let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?));
+ self.rpc_try_call_many(&who[..],
+ &TableRPC::<F>::Update(vec![what_enc]),
+ who.len(),
+ )
+ .await
+ .map(|_|())
+ }
+
async fn rpc_try_call_many(
&self,
who: &[UUID],
@@ -263,18 +321,19 @@ impl<F: TableFormat + 'static> Table<F> {
}
}
- fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<F::E>, Error> {
+ fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
let tree_key = self.tree_key(p, s);
if let Some(bytes) = self.store.get(&tree_key)? {
- let e = rmp_serde::decode::from_read_ref::<_, F::E>(&bytes)?;
- Ok(Some(e))
+ Ok(Some(ByteBuf::from(bytes.to_vec())))
} else {
Ok(None)
}
}
- async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> {
- for update in entries.drain(..) {
+ async fn handle_update(&self, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
+ for update_bytes in entries.drain(..) {
+ let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
+
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let (old_entry, new_entry) = self.store.transaction(|db| {
@@ -297,7 +356,7 @@ impl<F: TableFormat + 'static> Table<F> {
Ok((old_entry, new_entry))
})?;
- self.instance.updated(old_entry.as_ref(), &new_entry).await;
+ self.instance.updated(old_entry, new_entry).await;
}
Ok(())
}